Silly keyBy() error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Silly keyBy() error

Ron Crocker
I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream
                .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835

Reply | Threaded
Open this post in threaded view
|

Re: Silly keyBy() error

stefanobaghino
Hi Ron,

not all classes can be used to `keyBy` a stream with. For your case in particular, it looks like you have to implement Comparable so that Flink can correctly key your stream based on AggregatableTimesliceImpl.

Take a look at the first slides here for more information on keying: http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html

Hope I helped.

On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <[hidden email]> wrote:
I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream
                .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: <a href="tel:%2B1%20630%20363%208835" value="+16303638835" target="_blank">+1 630 363 8835




--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Silly keyBy() error

Ron Crocker
Thanks Stefano -

That helped, but just led to different pain. I think I need to reconsider how I treat these things. Alas, the subject of a different thread.

Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: +1 630 363 8835

On Mar 12, 2016, at 12:11 PM, Stefano Baghino <[hidden email]> wrote:

Hi Ron,

not all classes can be used to `keyBy` a stream with. For your case in particular, it looks like you have to implement Comparable so that Flink can correctly key your stream based on AggregatableTimesliceImpl.

Take a look at the first slides here for more information on keying: http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html

Hope I helped.

On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <[hidden email]> wrote:
I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream
                .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: <a href="tel:%2B1%20630%20363%208835" value="+16303638835" target="_blank" class="">+1 630 363 8835




--
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply | Threaded
Open this post in threaded view
|

Re: Silly keyBy() error

rmetzger0
Hey Ron,

for accessing keys of a class by their field name (like you did: .keyBy("accountId", "agentId", "wideMetricId")), the class needs to be recognized as a POJO by Flink.
From the documentation [1] a class is recognized as a POJO when:

  • The class must be public.

  • It must have a public constructor without arguments (default constructor).

  • All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().

  • The type of a field must be supported by Flink. At the moment, Flink uses Avro to serialize arbitrary objects (such as Date).

In your case, I believe the class you are trying to use as a POJO is AggregatableTimeslice, and that's an interface only (hence Flink is treating it as a GenericType, as you can see from the exception).
What you are trying to achieve should work when you make the interface a class.
Another option, and that's what I would recommend you to do in your situation is using a KeySelector function. Its basically a call-back that returns the key fields from your type. Since you are using Java 8 anyways, its not going to be a lot of boilerplate to implement the KeySelector.

I would also recommend you to register the implementations of the AggregatableTimeslice class with Kryo for better performance: env.registerType(AggregatableTimesliceImpl.class); 
This will make the serialization with Kryo much faster.

Regards,

On Sun, Mar 13, 2016 at 4:04 AM, Ron Crocker <[hidden email]> wrote:
Thanks Stefano -

That helped, but just led to different pain. I think I need to reconsider how I treat these things. Alas, the subject of a different thread.

Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: <a href="tel:%2B1%20630%20363%208835" value="+16303638835" target="_blank">+1 630 363 8835

On Mar 12, 2016, at 12:11 PM, Stefano Baghino <[hidden email]> wrote:

Hi Ron,

not all classes can be used to `keyBy` a stream with. For your case in particular, it looks like you have to implement Comparable so that Flink can correctly key your stream based on AggregatableTimesliceImpl.

Take a look at the first slides here for more information on keying: http://dataartisans.github.io/flink-training/dataStreamAdvanced/slides.html

Hope I helped.

On Sat, Mar 12, 2016 at 9:01 PM, Ron Crocker <[hidden email]> wrote:
I’m sure this should work, but I’m missing something… I searched the archive first, but didn’t have much luck finding any insights there.

TL;DR: org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.

I’m just getting started with a 1.0 implementation of a new task. It’s a pretty straightforward reduce job, but I’m running into a snag with creating a KeyedStream.

Here’s the graph:
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.RESOLVED_TIMESLICE_DATA_KAFKA_TOPIC_NAME, new TimesliceDeserializer(), kafkaConsumerProperties));

        SingleOutputStreamOperator<AggregatableTimeslice> flattenedDataStream = dataStream
                .assignTimestampsAndWatermarks(new TimesliceTimestampExtractor())
                .flatMap(new TimesliceMapper());

        flattenedDataStream
                .keyBy("accountId", "agentId", "wideMetricId")
                .timeWindow(Time.seconds(60))
                .reduce(AggregatableTimeslice::aggregateWith)
                .print();

This fails on keyBy() with the message: 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<com.newrelic.flink_aggregator.AggregatableTimeslice>) cannot be used as key.
TimesliceMapper is a concrete implementation of FlatMapFunction<TimesliceData, AggregatableTimeslice>, namely
public class TimesliceMapper implements FlatMapFunction<TimesliceData, AggregatableTimeslice> {
    @Override
    public void flatMap(TimesliceData value, Collector<AggregatableTimeslice> out) throws Exception {
        for (Timeslice timeslice : value.getTimeslices()) {
            out.collect(new AggregatableTimesliceImpl(timeslice, value, value.getAgentId()));
        }
    }
}
AggregatableTimesliceImpl is a simple concrete implementation of the AggregatableTimeslice interface:
public interface AggregatableTimeslice {
int getAccountId();
int getAgentId();
long getWideMetricId();
AggregatableTimesliceStats getTimesliceStats();
}
Ron
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
M: <a href="tel:%2B1%20630%20363%208835" value="+16303638835" target="_blank">+1 630 363 8835




--
BR,
Stefano Baghino

Software Engineer @ Radicalbit