Help on RowTypeInfo?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Help on RowTypeInfo?

PaulWu
Please help how to "translate" table to DataStream in the fellowing code.

StreamTableEnvironment ste =
StreamTableEnvironment.getTableEnvironment(EXE_ENV);
        ste.registerDataStreamInternal("abc", stream);
        Table ts = ste.sql("select * from abc");
        ts = ts.as("count,word");
        System.out.println("ts=" + ts.getSchema());
        ts.printSchema();
        String[] names = new String[]{"count", "word"};
        TypeInformation[] types = new TypeInformation[]{Types.STRING,
Types.STRING};

        RowTypeInfo tpe = Types.ROW(types);
        DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
true, tpe);
        ds.print();

It throws an exception:
Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
class org.apache.flink.api.java.typeutils.RowTypeInfo)
        at
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
        at
com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help on RowTypeInfo?

Timo Walther
Hi,

the translate() method is an internal method. You can use
"toRetractStream(table, Row.class)" or "toAppendStream(table,
Row.class)" to convert you table into a stream. Make sure to use the
correct StreamTableEnvironment for your API:
org.apache.flink.table.api.java.StreamTableEnvironment

Regards,
Timo

Am 10/29/17 um 5:53 AM schrieb PaulWu:

> Please help how to "translate" table to DataStream in the fellowing code.
>
> StreamTableEnvironment ste =
> StreamTableEnvironment.getTableEnvironment(EXE_ENV);
>          ste.registerDataStreamInternal("abc", stream);
>          Table ts = ste.sql("select * from abc");
>          ts = ts.as("count,word");
>          System.out.println("ts=" + ts.getSchema());
>          ts.printSchema();
>          String[] names = new String[]{"count", "word"};
>          TypeInformation[] types = new TypeInformation[]{Types.STRING,
> Types.STRING};
>
>          RowTypeInfo tpe = Types.ROW(types);
>          DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
> true, tpe);
>          ds.print();
>
> It throws an exception:
> Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
> class org.apache.flink.api.java.typeutils.RowTypeInfo)
> at
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
> at
> com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Help on RowTypeInfo?

PaulWu
Where is this method(from which class/object) you mentioned? I can only find

(new TableConversions(ts)).toRetractStream(TypeInformation.of(Row.class))

I use flink 1.3.2 java api, and weird the compilation error says this method
is not available although I can see it in the api and my ide tip.  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help on RowTypeInfo?

PaulWu
I felt it is so messy... for the api design: Look at this...
StreamTableEnvironment can from three different packages, which I should
choose? I tried each of them and I just have one problem or another.

//import org.apache.flink.table.api.scala.StreamTableEnvironment;
//import org.apache.flink.table.api.java.StreamTableEnvironment;
//import org.apache.flink.table.api.StreamTableEnvironment;



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help on RowTypeInfo?

PaulWu
Sorry for my rant...fairly new. Felt lost. The one (StreamTableEnvironment)
from java needs to use the constructor:
StreamTableEnvironment(StreamExecutionEnvironment execEnv, TableConfig
config) .

Now it works. Thanks. Still confused...why
 
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

works? Even it could have some same compiling error sometimes.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Help on RowTypeInfo?

Fabian Hueske-2
Hi Paul,

The *.scala.StreamTableEnvironment is for Scala programs, the *.java.StreamTableEnvironment for Java programs and the third is the common basis of the Scala and Java environment.
TableEnvironment.getTableEnvironment automatically creates the appropriate TableEnvironment based on the provided ExecuctionEnvironment.

The Table API and SQL are designed to be unified APIs for batch and streaming and we aim to support the same queries on batch and streaming tables.

Best, Fabian




2017-10-29 15:39 GMT+01:00 PaulWu <[hidden email]>:
Sorry for my rant...fairly new. Felt lost. The one (StreamTableEnvironment)
from java needs to use the constructor:
StreamTableEnvironment(StreamExecutionEnvironment execEnv, TableConfig
config) .

Now it works. Thanks. Still confused...why

BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

works? Even it could have some same compiling error sometimes.