Convert DataStream<Row> to Table with the same columns in Row

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Convert DataStream<Row> to Table with the same columns in Row

John Smith-2
Hi,

Sorry if this is a duplicate question but I couldn't find any answer to my question.
I am trying to convert a DataStream<Row> into a Table where the columns in the Row objects in the DataStream<Row> will become columns of the Table.
Here is how I tried to do it:

//Creating a DataStream of Row type. Let's assume the Row type has 3 columns:
// (c1 BIGINT, c2 String, c3 String)
DataStream<Row> rowStream = ....

// Convert it to a Table
Table t = tableEnv.fromDataStream(rowStream)

// Print the table
t.execute().print();

However, when I print the table it has one column of type Row instead of three columns (c1, c2, c3).

What I see in the print result is:

+----+--------------------------------+
| op |                                  f0   |
+----+--------------------------------+
| +I | +I{c1=1620968140951, ... |

What I would like to see is:

+----+-------------------------------------------+
| op |                        c1  |   c2   |    c3   | 
+----+-------------------------------------------+
|   +I |  1620968140951 |  'foo'  |  'bar'  | 

How can I convert the DataStream to a table that has the same columns as the columns in Row in the DataStream.
Would really appreciate it if anyone can share a code snippet for the above example.

Thanks,
JS.

Reply | Threaded
Open this post in threaded view
|

Re: Convert DataStream<Row> to Table with the same columns in Row

Timo Walther
Hi John,

please check the type that is coming in from the DataStream API via
dataStream.getType(). It should be an instance of RowTypeInfo otherwise
the Table API cannot extract the columns correctly.

Usually, you can overwrite the type of the last DataStream operation
using the `.returns(Types.ROW(...))` call.

I hope this helps.

Regards,
Timo


On 14.05.21 09:01, John Smith wrote:

> Hi,
>
> Sorry if this is a duplicate question but I couldn't find any answer to
> my question.
> I am trying to convert a DataStream<Row> into a Table where the columns
> in the Row objects in the DataStream<Row> will become columns of the Table.
> Here is how I tried to do it:
>
> //Creating a DataStream of Row type. Let's assume the Row type has 3
> columns:
> // (c1 BIGINT, c2 String, c3 String)
> DataStream<Row> rowStream = ....
>
> // Convert it to a Table
> Table t = tableEnv.fromDataStream(rowStream)
>
> // Print the table
> t.execute().print();
>
> However, when I print the table it has one column of type Row instead of
> three columns (c1, c2, c3).
>
> What I see in the print result is:
>
> +----+--------------------------------+
> | op |                                  f0   |
> +----+--------------------------------+
> | +I | +I{c1=1620968140951, ... |
>
> What I would like to see is:
>
> +----+-------------------------------------------+
> | op |                        c1  |   c2   |    c3   |
> +----+-------------------------------------------+
> |   +I |  1620968140951 |  'foo'  |  'bar'  |
>
> How can I convert the DataStream to a table that has the same columns as
> the columns in Row in the DataStream.
> Would really appreciate it if anyone can share a code snippet for the
> above example.
>
> Thanks,
> JS.
>

Reply | Threaded
Open this post in threaded view
|

Re: Convert DataStream<Row> to Table with the same columns in Row

Fabian Paul
In reply to this post by John Smith-2
Hi John,

Can you maybe share more code about how you build the DataStrean<Row>?
It would also be good to know against which Flink version you are testing. I just
tried the following code against the current master and: 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Row> rowStream = env.fromElements(Row.of(1, "test1"), Row.of(2, "test2"));
Table t = tableEnv.fromDataStream(rowStream);
t.execute().print();
seems to work fine.
+----+-------------+--------------------------------+
| op |          f0 |                             f1 |
+----+-------------+--------------------------------+
| +I |           1 |                          test1 |
| +I |           2 |                          test2 |
+----+-------------+--------------------------------+

Best,
Fabian


On 14. May 2021, at 09:01, John Smith <[hidden email]> wrote:

Hi,

Sorry if this is a duplicate question but I couldn't find any answer to my question.
I am trying to convert a DataStream<Row> into a Table where the columns in the Row objects in the DataStream<Row> will become columns of the Table.
Here is how I tried to do it:

//Creating a DataStream of Row type. Let's assume the Row type has 3 columns:
// (c1 BIGINT, c2 String, c3 String)
DataStream<Row> rowStream = ....

// Convert it to a Table
Table t = tableEnv.fromDataStream(rowStream)

// Print the table
t.execute().print();

However, when I print the table it has one column of type Row instead of three columns (c1, c2, c3).

What I see in the print result is:

+----+--------------------------------+
| op |                                  f0   |
+----+--------------------------------+
| +I | +I{c1=1620968140951, ... |

What I would like to see is:

+----+-------------------------------------------+
| op |                        c1  |   c2   |    c3   | 
+----+-------------------------------------------+
|   +I |  1620968140951 |  'foo'  |  'bar'  | 

How can I convert the DataStream to a table that has the same columns as the columns in Row in the DataStream.
Would really appreciate it if anyone can share a code snippet for the above example.

Thanks,
JS.


Reply | Threaded
Open this post in threaded view
|

Re: Convert DataStream<Row> to Table with the same columns in Row

John Smith-2
Thanks for your help Timo and Fabian,
Got it working with Timo’s suggestion.

On Fri, May 14, 2021 at 6:14 AM Fabian Paul <[hidden email]> wrote:
Hi John,

Can you maybe share more code about how you build the DataStrean<Row>?
It would also be good to know against which Flink version you are testing. I just
tried the following code against the current master and: 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Row> rowStream = env.fromElements(Row.of(1, "test1"), Row.of(2, "test2"));
Table t = tableEnv.fromDataStream(rowStream);
t.execute().print();
seems to work fine.
+----+-------------+--------------------------------+
| op |          f0 |                             f1 |
+----+-------------+--------------------------------+
| +I |           1 |                          test1 |
| +I |           2 |                          test2 |
+----+-------------+--------------------------------+

Best,
Fabian


On 14. May 2021, at 09:01, John Smith <[hidden email]> wrote:

Hi,

Sorry if this is a duplicate question but I couldn't find any answer to my question.
I am trying to convert a DataStream<Row> into a Table where the columns in the Row objects in the DataStream<Row> will become columns of the Table.
Here is how I tried to do it:

//Creating a DataStream of Row type. Let's assume the Row type has 3 columns:
// (c1 BIGINT, c2 String, c3 String)
DataStream<Row> rowStream = ....

// Convert it to a Table
Table t = tableEnv.fromDataStream(rowStream)

// Print the table
t.execute().print();

However, when I print the table it has one column of type Row instead of three columns (c1, c2, c3).

What I see in the print result is:

+----+--------------------------------+
| op |                                  f0   |
+----+--------------------------------+
| +I | +I{c1=1620968140951, ... |

What I would like to see is:

+----+-------------------------------------------+
| op |                        c1  |   c2   |    c3   | 
+----+-------------------------------------------+
|   +I |  1620968140951 |  'foo'  |  'bar'  | 

How can I convert the DataStream to a table that has the same columns as the columns in Row in the DataStream.
Would really appreciate it if anyone can share a code snippet for the above example.

Thanks,
JS.