Please help how to "translate" table to DataStream in the fellowing code.
StreamTableEnvironment ste = StreamTableEnvironment.getTableEnvironment(EXE_ENV); ste.registerDataStreamInternal("abc", stream); Table ts = ste.sql("select * from abc"); ts = ts.as("count,word"); System.out.println("ts=" + ts.getSchema()); ts.printSchema(); String[] names = new String[]{"count", "word"}; TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING}; RowTypeInfo tpe = Types.ROW(types); DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true, true, tpe); ds.print(); It throws an exception: Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of class org.apache.flink.api.java.typeutils.RowTypeInfo) at org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645) at com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
the translate() method is an internal method. You can use "toRetractStream(table, Row.class)" or "toAppendStream(table, Row.class)" to convert you table into a stream. Make sure to use the correct StreamTableEnvironment for your API: org.apache.flink.table.api.java.StreamTableEnvironment Regards, Timo Am 10/29/17 um 5:53 AM schrieb PaulWu: > Please help how to "translate" table to DataStream in the fellowing code. > > StreamTableEnvironment ste = > StreamTableEnvironment.getTableEnvironment(EXE_ENV); > ste.registerDataStreamInternal("abc", stream); > Table ts = ste.sql("select * from abc"); > ts = ts.as("count,word"); > System.out.println("ts=" + ts.getSchema()); > ts.printSchema(); > String[] names = new String[]{"count", "word"}; > TypeInformation[] types = new TypeInformation[]{Types.STRING, > Types.STRING}; > > RowTypeInfo tpe = Types.ROW(types); > DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true, > true, tpe); > ds.print(); > > It throws an exception: > Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of > class org.apache.flink.api.java.typeutils.RowTypeInfo) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645) > at > com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127) > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Where is this method(from which class/object) you mentioned? I can only find
(new TableConversions(ts)).toRetractStream(TypeInformation.of(Row.class)) I use flink 1.3.2 java api, and weird the compilation error says this method is not available although I can see it in the api and my ide tip. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I felt it is so messy... for the api design: Look at this...
StreamTableEnvironment can from three different packages, which I should choose? I tried each of them and I just have one problem or another. //import org.apache.flink.table.api.scala.StreamTableEnvironment; //import org.apache.flink.table.api.java.StreamTableEnvironment; //import org.apache.flink.table.api.StreamTableEnvironment; -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Sorry for my rant...fairly new. Felt lost. The one (StreamTableEnvironment)
from java needs to use the constructor: StreamTableEnvironment(StreamExecutionEnvironment execEnv, TableConfig config) . Now it works. Thanks. Still confused...why BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); works? Even it could have some same compiling error sometimes. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Paul, The *.scala.StreamTableEnvironment is for Scala programs, the *.java.StreamTableEnvironment for Java programs and the third is the common basis of the Scala and Java environment. TableEnvironment.getTableEnvironment automatically creates the appropriate TableEnvironment based on the provided ExecuctionEnvironment. The Table API and SQL are designed to be unified APIs for batch and streaming and we aim to support the same queries on batch and streaming tables. Best, Fabian 2017-10-29 15:39 GMT+01:00 PaulWu <[hidden email]>: Sorry for my rant...fairly new. Felt lost. The one (StreamTableEnvironment) |
Free forum by Nabble | Edit this page |