Login  Register

how to convert DataStream<RowData> to Table

classic Classic list List threaded Threaded
3 messages Options Options
Embed post
Reply | Threaded
Open this post in threaded view
| More
Print post

how to convert DataStream<RowData> to Table


Hi All,

there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event has RowKind, but i have trouble converting DataStream<RowData> to a Table.

For test, i tried StreamTableEnvironment#fromDataStream and createTemporaryView API, both TableSchema is 



 |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')


i want to get the schema :



 |— column1: Type,

 |— column2: Type, 



how to convert DataStream<RowData> with RowKind to Table? 

Thank you very much for your reply

Reply | Threaded
Open this post in threaded view
| More
Print post

Re: how to convert DataStream<RowData> to Table

Roman Khachatryan

I'm pulling in Timo and Jark as they know Table API better.


On Sun, Apr 11, 2021 at 3:36 PM vtygoss <[hidden email]> wrote:

> Hi All,
> there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event has RowKind, but i have trouble converting DataStream<RowData> to a Table.
> For test, i tried StreamTableEnvironment#fromDataStream and createTemporaryView API, both TableSchema is
> ```
> root
>  |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')
> ```
> i want to get the schema :
> ```
> root
>  |— column1: Type,
>  |— column2: Type,
> ….
> ```
> how to convert DataStream<RowData> with RowKind to Table?
> Thank you very much for your reply
Reply | Threaded
Open this post in threaded view
| More
Print post

Re: how to convert DataStream<RowData> to Table

In reply to this post by vtygoss

Here's an example that works for me:

""" import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;

import java.util.List;

public class Stream2Table {

public static void main(String[] args) {

var streamingEnv = StreamExecutionEnvironment.getExecutionEnvironment();
var tableEnv = StreamTableEnvironment.create(streamingEnv);

var userRows = streamingEnv.fromCollection(
Row.of("user1", "alice[hidden email]", "Alice"),
Row.of("user2", "bob[hidden email]", "Bob")
new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING));

var table = tableEnv
$("user_id"), $("handle"), $("name"));


} """

You can also dig here, you'll probably find better examples



On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:

Hi All,

there is a scenario where I need to process OGG Log data in kafka using Flink Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event has RowKind, but i have trouble converting DataStream<RowData> to a Table.

For test, i tried StreamTableEnvironment#fromDataStream and createTemporaryView API, both TableSchema is 



 |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')


i want to get the schema :



 |— column1: Type,

 |— column2: Type, 



how to convert DataStream<RowData> with RowKind to Table? 

Thank you very much for your reply