Hi, Community: I met some field name errors when try to convert in Table and DataStream. flink version: 1.9.1 First, init a datastream and convert to table 'source', register a tablefunction named 'foo' val sourceStream = env.socketTextStream("127.0.0.1", 8010) Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' val t1 = tableEnv.sqlQuery( When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b' I can find 'f0' only with the Java-API in Refs-1. val t1Stream = t1.toAppendStream[Row] Consider maybe the default TypeExtractor(?) works not very well here, Then I try to set the field name explicitly, but failed too. tableEnv.registerDataStream("t1", t1Stream, 'a, 'b) If I add a proctime at first, this works well, but I do not want to set a proctime which is unused. tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime) And my question is : 1. why the behavior of the code above seems a little strange? 2. How to retain the 'b' when convert with table and stream frequently? Refs: Thanks for ur reply.
|
Hi, I am afraid you are facing an issue that was not checked for/was not considered. I think your use case is absolutely valid and should be supported. The problem you are facing as far as I can tell from an initial investigation is that the top-level projection/rename is not being applied. Internally the foo(a) is passed around as an unnamed expression and should be aliased at the top level. This happens when simply querying therefore you get expected results in the first case when only printing the schema of a Table. When translating to the datastream this final rename does not
take place, which imo is a bug. You can see this behaviour if you
add additional projection. Then the renaming of the expression
from lateral table happens a level deeper and is not stripped. val t1 = tableEnv.sqlQuery(
t1 stream schema: Row(EXPR$0: Integer, a: Integer, b:
Integer)
Do you mind creating a JIRA issue to fix/support this case? Unfortunately I can not think of a really good way how you could retain the column names. :( Best, Dawid On 28/07/2020 10:26, izual wrote:
signature.asc (849 bytes) Download Attachment |
I create a JIRA issue here, https://issues.apache.org/jira/browse/FLINK-18782 And thanks for your advice to avoid 「top-level projection/rename」^_^ At 2020-07-30 16:58:45, "Dawid Wysakowicz" <[hidden email]> wrote:
|
Hi, For now, you can explicitly set the RowTypeInfo to retain the field names. This works in master branch: val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType) // t1 stream schema: Row(a: Integer, b: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tEnv.registerDataStream("t1", t1Stream) /* new t1 table schema: root |-- a: INT |-- b: INT */ println(s"new t1 table schema: ${tEnv.scan("t1").getSchema}") Best, Jark On Fri, 31 Jul 2020 at 18:03, izual <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |