Login  Register

how to convert DataStream<RowData> to Table

classic Classic list List threaded Threaded
3 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

how to convert DataStream<RowData> to Table

vtygoss

Hi All,


there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event has RowKind, but i have trouble converting DataStream<RowData> to a Table.

For test, i tried StreamTableEnvironment#fromDataStream and createTemporaryView API, both TableSchema is 

```

root

 |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')

```


i want to get the schema :


```

root 

 |— column1: Type,

 |— column2: Type, 

….

```



how to convert DataStream<RowData> with RowKind to Table? 



Thank you very much for your reply


Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: how to convert DataStream<RowData> to Table

Roman Khachatryan
Hi,

I'm pulling in Timo and Jark as they know Table API better.

Regards,
Roman

On Sun, Apr 11, 2021 at 3:36 PM vtygoss <[hidden email]> wrote:

>
> Hi All,
>
>
> there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event has RowKind, but i have trouble converting DataStream<RowData> to a Table.
>
> For test, i tried StreamTableEnvironment#fromDataStream and createTemporaryView API, both TableSchema is
>
> ```
>
> root
>
>  |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')
>
> ```
>
>
> i want to get the schema :
>
>
> ```
>
> root
>
>  |— column1: Type,
>
>  |— column2: Type,
>
> ….
>
> ```
>
>
>
> how to convert DataStream<RowData> with RowKind to Table?
>
>
>
> Thank you very much for your reply
>
>
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: how to convert DataStream<RowData> to Table

Svend
In reply to this post by vtygoss
Hi,

Here's an example that works for me:


""" import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;

import java.util.List;

public class Stream2Table {

public static void main(String[] args) {

var streamingEnv = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(streamingEnv);

var userRows = streamingEnv.fromCollection(
List.of(
Row.of("user1", "alice[hidden email]", "Alice"),
Row.of("user2", "bob[hidden email]", "Bob")
),
new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING));

var table = tableEnv
.fromDataStream(userRows,
$("user_id"), $("handle"), $("name"));

table.execute().print();
}

} """

You can also dig here, you'll probably find better examples

Cheers,

Svend


On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:

Hi All,


there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event has RowKind, but i have trouble converting DataStream<RowData> to a Table.

For test, i tried StreamTableEnvironment#fromDataStream and createTemporaryView API, both TableSchema is 

```

root

 |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')

```


i want to get the schema :


```

root 

 |— column1: Type,

 |— column2: Type, 

….

```



how to convert DataStream<RowData> with RowKind to Table? 



Thank you very much for your reply