How to retain the column'name when convert a Table to DataStream

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to retain the column'name when convert a Table to DataStream

Izual
Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1

First, init a datastream and convert to table 'source', register a tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
.map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
def eval(col: Int): Unit = collect((col * 10))
}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
"""
|SELECT source_table.a, b FROM source_table
|, LATERAL TABLE(foo(a)) as T(b)
|""".stripMargin
)
/*
t1 table schema: root
|-- a: INT
|-- b: INT
*/
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
*/
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime)

And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?

Refs:

Thanks for ur reply.


 

Reply | Threaded
Open this post in threaded view
|

Re: How to retain the column'name when convert a Table to DataStream

Dawid Wysakowicz-2

Hi,

I am afraid you are facing an issue that was not checked for/was not considered. I think your use case is absolutely valid and should be supported.

The problem you are facing as far as I can tell from an initial investigation is that the top-level projection/rename is not being applied. Internally the foo(a) is passed around as an unnamed expression and should be aliased at the top level. This happens when simply querying therefore you get expected results in the first case when only printing the schema of a Table.

When translating to the datastream this final rename does not take place, which imo is a bug. You can see this behaviour if you add additional projection. Then the renaming of the expression from lateral table happens a level deeper and is not stripped.

    val t1 = tableEnv.sqlQuery(
      """
        |SELECT 1, * FROM (
        |SELECT source_table.a, b FROM source_table
        |, LATERAL TABLE(foo(a)) as T(b))
        |""".stripMargin


    t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
    new t1 table schema: root
     |-- EXPR$0: INT
     |-- a: INT
     |-- b: INT


Do you mind creating a JIRA issue to fix/support this case?

Unfortunately I can not think of a really good way how you could retain the column names. :(

Best,

Dawid

On 28/07/2020 10:26, izual wrote:
Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1

First, init a datastream and convert to table 'source', register a tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))
}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
  """
    |SELECT source_table.a, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime)

And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?

Refs:

Thanks for ur reply.


 


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re:Re: How to retain the column'name when convert a Table to DataStream

Izual
And thanks for your advice to avoid 「top-level projection/rename」^_^




At 2020-07-30 16:58:45, "Dawid Wysakowicz" <[hidden email]> wrote:

Hi,

I am afraid you are facing an issue that was not checked for/was not considered. I think your use case is absolutely valid and should be supported.

The problem you are facing as far as I can tell from an initial investigation is that the top-level projection/rename is not being applied. Internally the foo(a) is passed around as an unnamed expression and should be aliased at the top level. This happens when simply querying therefore you get expected results in the first case when only printing the schema of a Table.

When translating to the datastream this final rename does not take place, which imo is a bug. You can see this behaviour if you add additional projection. Then the renaming of the expression from lateral table happens a level deeper and is not stripped.

    val t1 = tableEnv.sqlQuery(
      """
        |SELECT 1, * FROM (
        |SELECT source_table.a, b FROM source_table
        |, LATERAL TABLE(foo(a)) as T(b))
        |""".stripMargin


    t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
    new t1 table schema: root
     |-- EXPR$0: INT
     |-- a: INT
     |-- b: INT


Do you mind creating a JIRA issue to fix/support this case?

Unfortunately I can not think of a really good way how you could retain the column names. :(

Best,

Dawid

On 28/07/2020 10:26, izual wrote:
Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1

First, init a datastream and convert to table 'source', register a tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))
}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
  """
    |SELECT source_table.a, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime)

And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?

Refs:

Thanks for ur reply.


 



 

Reply | Threaded
Open this post in threaded view
|

Re: Re: How to retain the column'name when convert a Table to DataStream

Jark Wu-3
Hi,

For now, you can explicitly set the RowTypeInfo to retain the field names. This works in master branch:

    val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)
    // t1 stream schema: Row(a: Integer, b: Integer)
    println(s"t1 stream schema: ${t1Stream.getType()}")
    tEnv.registerDataStream("t1", t1Stream)
    /*
    new t1 table schema: root
    |-- a: INT
    |-- b: INT
     */
    println(s"new t1 table schema: ${tEnv.scan("t1").getSchema}")


Best,
Jark

On Fri, 31 Jul 2020 at 18:03, izual <[hidden email]> wrote:
And thanks for your advice to avoid 「top-level projection/rename」^_^




At 2020-07-30 16:58:45, "Dawid Wysakowicz" <[hidden email]> wrote:

Hi,

I am afraid you are facing an issue that was not checked for/was not considered. I think your use case is absolutely valid and should be supported.

The problem you are facing as far as I can tell from an initial investigation is that the top-level projection/rename is not being applied. Internally the foo(a) is passed around as an unnamed expression and should be aliased at the top level. This happens when simply querying therefore you get expected results in the first case when only printing the schema of a Table.

When translating to the datastream this final rename does not take place, which imo is a bug. You can see this behaviour if you add additional projection. Then the renaming of the expression from lateral table happens a level deeper and is not stripped.

    val t1 = tableEnv.sqlQuery(
      """
        |SELECT 1, * FROM (
        |SELECT source_table.a, b FROM source_table
        |, LATERAL TABLE(foo(a)) as T(b))
        |""".stripMargin


    t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
    new t1 table schema: root
     |-- EXPR$0: INT
     |-- a: INT
     |-- b: INT


Do you mind creating a JIRA issue to fix/support this case?

Unfortunately I can not think of a really good way how you could retain the column names. :(

Best,

Dawid

On 28/07/2020 10:26, izual wrote:
Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1

First, init a datastream and convert to table 'source', register a tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
  def eval(col: Int): Unit = collect((col * 10))
}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
  """
    |SELECT source_table.a, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime)

And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?

Refs:

Thanks for ur reply.