Flink Table to DataStream: how to access column name?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Table to DataStream: how to access column name?

Yik San Chan
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table to DataStream: how to access column name?

Till Rohrmann
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table to DataStream: how to access column name?

Yik San Chan
Hi Till,

I look inside the Row class, it does contain a member `private final Object[] fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <[hidden email]> wrote:
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table to DataStream: how to access column name?

Till Rohrmann
There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <[hidden email]> wrote:
Hi Till,

I look inside the Row class, it does contain a member `private final Object[] fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <[hidden email]> wrote:
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table to DataStream: how to access column name?

Yik San Chan
Hi Till,

From the version I am using (1.12.0), getFieldNames is not available in Row ... See https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java.

Is there any workaround for this in version 1.12.0? Thanks.

Best,
Yik San

On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <[hidden email]> wrote:
There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <[hidden email]> wrote:
Hi Till,

I look inside the Row class, it does contain a member `private final Object[] fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <[hidden email]> wrote:
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table to DataStream: how to access column name?

Till Rohrmann
You are right Yik San. This feature has only been introduced in the upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which should happen in a couple of weeks if you really need this feature.


Cheers,
Till

On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan <[hidden email]> wrote:
Hi Till,

From the version I am using (1.12.0), getFieldNames is not available in Row ... See https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java.

Is there any workaround for this in version 1.12.0? Thanks.

Best,
Yik San

On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <[hidden email]> wrote:
There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <[hidden email]> wrote:
Hi Till,

I look inside the Row class, it does contain a member `private final Object[] fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <[hidden email]> wrote:
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table to DataStream: how to access column name?

Yik San Chan
Thank you, Till!

Actually I find I can access this via `Table.getSchema.getFieldNames` in version 1.12.0

Best,
Yik San

On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann <[hidden email]> wrote:
You are right Yik San. This feature has only been introduced in the upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which should happen in a couple of weeks if you really need this feature.


Cheers,
Till

On Tue, Mar 30, 2021 at 6:33 PM Yik San Chan <[hidden email]> wrote:
Hi Till,

From the version I am using (1.12.0), getFieldNames is not available in Row ... See https://github.com/apache/flink/blob/release-1.12/flink-core/src/main/java/org/apache/flink/types/Row.java.

Is there any workaround for this in version 1.12.0? Thanks.

Best,
Yik San

On Wed, Mar 31, 2021 at 12:17 AM Till Rohrmann <[hidden email]> wrote:
There is a method Row.getFieldNames.

Cheers,
Till

On Tue, Mar 30, 2021 at 6:06 PM Yik San Chan <[hidden email]> wrote:
Hi Till,

I look inside the Row class, it does contain a member `private final Object[] fields;` though I wonder how to get column names out of the member?

Thanks!

Best,
Yik San

On Tue, Mar 30, 2021 at 11:45 PM Till Rohrmann <[hidden email]> wrote:
Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <[hidden email]> wrote:
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name.

I want to consume a Kafka topic into a table using Flink SQL, then convert it back to a DataStream.

Here is the `SOURCE_DDL`:

```
CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
```

With Flink, I execute the DDL.

```scala
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
```

Then, I convert it into DataStream, and do downstream logic in the `map(e => ...)` part.

```scala
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
```

Inside the `map(e => ...)` part, I would like to access the column name, in this case, `last_5_clicks`. Why? Because I may have different sources with different columns names (such as `last_10min_page_view`), but I would like to reuse the code in `map(e => ...)`.

Is there a way to do this? Thanks.

Best,
Yik San