How do I run SQL query on a dataStream that generates my custom type.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How do I run SQL query on a dataStream that generates my custom type.

nikhilsimha
I have a stream of events of a custom type. I want to know how to convert
these events into Rows that can be queried. More specifically how do I
attach type information to the stream of rows that is generated?

I have the following code

```
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv: StreamTableEnvironment =
TableEnvironment.getTableEnvironment(execEnv)

    val eventStream : DataStream[Event] = ...
    val rowStream: DataStream[Row] = eventStream.map({e: Event => toRow(e)})
    tableEnv.registerDataStream("some_name", rowStream)

    tableEnv.sql("select bedrooms from some_name")
```

This code compiles. But clearly wouldn't work because I didn't specify the
type or position of `bedrooms`.


So what API do I use to specify the typeInformation of my rowStream?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How do I run SQL query on a dataStream that generates my custom type.

Timo Walther
Hi,

In your case you don't have to convert to row if you don't want to. The
Table API will do automatic conversion once the stream of Event is
converted into a table. However, this only works if Event is a POJO.

If you want to specify own type information your MapFunction can
implement the ResultTypeQueryable interface. In the getProducedType
method you can specify your row infomation. Like Types.ROW_NAMED(...).

Hope this helps.

Regards,
Timo

Am 2/14/18 um 10:35 PM schrieb nikhilsimha:

> I have a stream of events of a custom type. I want to know how to convert
> these events into Rows that can be queried. More specifically how do I
> attach type information to the stream of rows that is generated?
>
> I have the following code
>
> ```
>      val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
>      val tableEnv: StreamTableEnvironment =
> TableEnvironment.getTableEnvironment(execEnv)
>
>      val eventStream : DataStream[Event] = ...
>      val rowStream: DataStream[Row] = eventStream.map({e: Event => toRow(e)})
>      tableEnv.registerDataStream("some_name", rowStream)
>
>      tableEnv.sql("select bedrooms from some_name")
> ```
>
> This code compiles. But clearly wouldn't work because I didn't specify the
> type or position of `bedrooms`.
>
>
> So what API do I use to specify the typeInformation of my rowStream?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: How do I run SQL query on a dataStream that generates my custom type.

Timo Walther
Or even easier:

You can do specify the type after the map call:

eventStream.map({e: Event => toRow(e)})(Types.ROW_NAMED(...))

Regards,
Timo


Am 2/15/18 um 9:55 AM schrieb Timo Walther:

> Hi,
>
> In your case you don't have to convert to row if you don't want to.
> The Table API will do automatic conversion once the stream of Event is
> converted into a table. However, this only works if Event is a POJO.
>
> If you want to specify own type information your MapFunction can
> implement the ResultTypeQueryable interface. In the getProducedType
> method you can specify your row infomation. Like Types.ROW_NAMED(...).
>
> Hope this helps.
>
> Regards,
> Timo
>
> Am 2/14/18 um 10:35 PM schrieb nikhilsimha:
>> I have a stream of events of a custom type. I want to know how to
>> convert
>> these events into Rows that can be queried. More specifically how do I
>> attach type information to the stream of rows that is generated?
>>
>> I have the following code
>>
>> ```
>>      val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>      val tableEnv: StreamTableEnvironment =
>> TableEnvironment.getTableEnvironment(execEnv)
>>
>>      val eventStream : DataStream[Event] = ...
>>      val rowStream: DataStream[Row] = eventStream.map({e: Event =>
>> toRow(e)})
>>      tableEnv.registerDataStream("some_name", rowStream)
>>
>>      tableEnv.sql("select bedrooms from some_name")
>> ```
>>
>> This code compiles. But clearly wouldn't work because I didn't
>> specify the
>> type or position of `bedrooms`.
>>
>>
>> So what API do I use to specify the typeInformation of my rowStream?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>