ClassCastException when using RowTypeInfo

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ClassCastException when using RowTypeInfo

madan
Hi,

Below is sample code I am trying with,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TypeInformation[] types = new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};

String[] fieldNames = new String[]{"id", "name", "salary", "department"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
env.registerType(RowTypeInfo.class);

env.addSource(new EmployeeSourceFunction(), "samplesource", rowTypeInfo)
.keyBy("department").sum("salary").addSink(new PrintSinkFunction<>());

public class EmployeeSourceFunction implements SourceFunction<Row> {
private boolean continueRead = true;

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (continueRead) {
for (int i = 0; i < 3 && continueRead; i++) {
Row row = new Row(4);
row.setField(0, Integer.valueOf(i));
row.setField(1, String.valueOf("user" + i));
row.setField(2, 1000 * i);
row.setField(3, "DEV");
ctx.collect(row);
}
continueRead = false;
}
}

@Override
public void cancel() {
continueRead = false;
}
}

And I am getting below exception 

java.lang.ClassCastException: org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.TupleTypeInfo
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167)


I have checked FieldAccessorFactory.java:167, 
if (typeInfo.isTupleType()) {
TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;
RowTypeInfo returns 'true' for isTupleType() and cannot be casted.


Can someone please tell me, Is it that I have done wrong configuration or bug in code ?


--
Thank you,
Madan.
Reply | Threaded
Open this post in threaded view
|

Re: ClassCastException when using RowTypeInfo

Timo Walther
Hi Madan,

this is definitely a bug. The Row type has mostly been added for the Table & SQL API and has not tested for expression keys. But in general I would use a tuple in your case as they are more efficient. The `registerType` is only necessary for generic types serialized with Kryo.

I opened https://issues.apache.org/jira/browse/FLINK-8255. If you would like to fix it, I can assign it to you.

Thanks.

Regards,
Timo



Am 12/13/17 um 4:16 PM schrieb madan:
Hi,

Below is sample code I am trying with,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation[] types = new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};

String[] fieldNames = new String[]{"id", "name", "salary", "department"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
env.registerType(RowTypeInfo.class);

env.addSource(new EmployeeSourceFunction(), "samplesource", rowTypeInfo)
        .keyBy("department").sum("salary").addSink(new PrintSinkFunction<>());

public class EmployeeSourceFunction implements SourceFunction<Row> {
    private boolean continueRead = true;

    @Override
    public void run(SourceContext<Row> ctx) throws Exception {
        while (continueRead) {
            for (int i = 0; i < 3 && continueRead; i++) {
                Row row = new Row(4);
                row.setField(0, Integer.valueOf(i));
                row.setField(1, String.valueOf("user" + i));
                row.setField(2, 1000 * i);
                row.setField(3, "DEV");
                ctx.collect(row);
            }
            continueRead = false;
        }
    }

    @Override
    public void cancel() {
        continueRead = false;
    }
}

And I am getting below exception 

java.lang.ClassCastException: org.apache.flink.api.java.typeutils.RowTypeInfo cannot be cast to org.apache.flink.api.java.typeutils.TupleTypeInfo
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:167)


I have checked FieldAccessorFactory.java:167, 
if (typeInfo.isTupleType()) {
   TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;
RowTypeInfo returns 'true' for isTupleType() and cannot be casted.


Can someone please tell me, Is it that I have done wrong configuration or bug in code ?


--
Thank you,
Madan.