Odd error

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Odd error

robert

Getting this:

DataStream<RawRecord> stream =
                env.addSource(new FlinkKafkaConsumer08<>("raw", schema, properties)
                ).setParallelism(30).flatMap(new RecordSplit()).setParallelism(30).
                        name("Raw splitter").keyBy("id","keyByHelper","schema");

Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:273)
com.company.ingest.stream.RawRecord.main(RawRecord.java:38)

I did add a new long compare

    @Override
    public int compareTo(SchemaRecord o) {
        return Long.compare(this.keyByHelper, o.keyByHelper);


I can't seem to get by this error...



Reply | Threaded
Open this post in threaded view
|

Re: Odd error

rmetzger0
Hi,

I assume the flatMap(new RecordSplit()) is emitting a RawRecord.
Is it possible that you've also added an empty constructor to it while adding the compareTo() method?

I think the problem is that one of your types (probably the schema) is recognized as a nested POJO.
Check out this documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/api_concepts.html#define-keys-using-field-expressions 

On Thu, Mar 23, 2017 at 4:35 AM, Telco Phone <[hidden email]> wrote:

Getting this:

DataStream<RawRecord> stream =
                env.addSource(new FlinkKafkaConsumer08<>("raw", schema, properties)
                ).setParallelism(30).flatMap(new RecordSplit()).setParallelism(30).
                        name("Raw splitter").keyBy("id","keyByHelper","schema");

Field expression must be equal to '*' or '_' for non-composite types.
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:342)
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:273)
com.company.ingest.stream.RawRecord.main(RawRecord.java:38)

I did add a new long compare

    @Override
    public int compareTo(SchemaRecord o) {
        return Long.compare(this.keyByHelper, o.keyByHelper);


I can't seem to get by this error...