I have a stream of events of a custom type. I want to know how to convert
these events into Rows that can be queried. More specifically how do I attach type information to the stream of rows that is generated? I have the following code ``` val execEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv) val eventStream : DataStream[Event] = ... val rowStream: DataStream[Row] = eventStream.map({e: Event => toRow(e)}) tableEnv.registerDataStream("some_name", rowStream) tableEnv.sql("select bedrooms from some_name") ``` This code compiles. But clearly wouldn't work because I didn't specify the type or position of `bedrooms`. So what API do I use to specify the typeInformation of my rowStream? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
In your case you don't have to convert to row if you don't want to. The Table API will do automatic conversion once the stream of Event is converted into a table. However, this only works if Event is a POJO. If you want to specify own type information your MapFunction can implement the ResultTypeQueryable interface. In the getProducedType method you can specify your row infomation. Like Types.ROW_NAMED(...). Hope this helps. Regards, Timo Am 2/14/18 um 10:35 PM schrieb nikhilsimha: > I have a stream of events of a custom type. I want to know how to convert > these events into Rows that can be queried. More specifically how do I > attach type information to the stream of rows that is generated? > > I have the following code > > ``` > val execEnv = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv: StreamTableEnvironment = > TableEnvironment.getTableEnvironment(execEnv) > > val eventStream : DataStream[Event] = ... > val rowStream: DataStream[Row] = eventStream.map({e: Event => toRow(e)}) > tableEnv.registerDataStream("some_name", rowStream) > > tableEnv.sql("select bedrooms from some_name") > ``` > > This code compiles. But clearly wouldn't work because I didn't specify the > type or position of `bedrooms`. > > > So what API do I use to specify the typeInformation of my rowStream? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Or even easier:
You can do specify the type after the map call: eventStream.map({e: Event => toRow(e)})(Types.ROW_NAMED(...)) Regards, Timo Am 2/15/18 um 9:55 AM schrieb Timo Walther: > Hi, > > In your case you don't have to convert to row if you don't want to. > The Table API will do automatic conversion once the stream of Event is > converted into a table. However, this only works if Event is a POJO. > > If you want to specify own type information your MapFunction can > implement the ResultTypeQueryable interface. In the getProducedType > method you can specify your row infomation. Like Types.ROW_NAMED(...). > > Hope this helps. > > Regards, > Timo > > Am 2/14/18 um 10:35 PM schrieb nikhilsimha: >> I have a stream of events of a custom type. I want to know how to >> convert >> these events into Rows that can be queried. More specifically how do I >> attach type information to the stream of rows that is generated? >> >> I have the following code >> >> ``` >> val execEnv = StreamExecutionEnvironment.getExecutionEnvironment >> val tableEnv: StreamTableEnvironment = >> TableEnvironment.getTableEnvironment(execEnv) >> >> val eventStream : DataStream[Event] = ... >> val rowStream: DataStream[Row] = eventStream.map({e: Event => >> toRow(e)}) >> tableEnv.registerDataStream("some_name", rowStream) >> >> tableEnv.sql("select bedrooms from some_name") >> ``` >> >> This code compiles. But clearly wouldn't work because I didn't >> specify the >> type or position of `bedrooms`. >> >> >> So what API do I use to specify the typeInformation of my rowStream? >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Free forum by Nabble | Edit this page |