Register time attribute while converting a DataStream<CRow> to Table

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Register time attribute while converting a DataStream<CRow> to Table

Jiahui Jiang
Hello Flink friends, I have a retract stream in the format of 'DataStream<CRow>' that I want to register into my table environment, and also expose processing time column in the table.

For a regular datastream, I have being doing 'tableEnvironment.createTemporaryView(path, dataStream, 'field1,field2, ..,__processing_time_column.proctime')'. with no issue. But for this retract stream, I was getting an error "org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type."

Digging a little bit deeper, in TypeInfoUtils#extractFieldInformation, it doesn't handle CRowTypeInfo as a known case. Looking at the behavior of 

Since it's a standard CompositeType, instead of only handling 'if (inputType instanceof PojoTypeInfo)', can we just add CRowTypeInfo here too? Is there any risk that I'm not aware of?

Thank you!
Reply | Threaded
Open this post in threaded view
|

Re: Register time attribute while converting a DataStream<CRow> to Table

Dawid Wysakowicz-2

Hi,

Unfortunately support for consuming upsert stream is not supported yet. It's not as easy as adding the type information there as you suggested. Even if you do that it will still be considered to be an append message internally by the planner. There is an ongoing effort (FLIP-95[1]) to support it in Flink 1.11.


Best,

Dawid


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces


On 13/05/2020 01:03, Jiahui Jiang wrote:
Hello Flink friends, I have a retract stream in the format of 'DataStream<CRow>' that I want to register into my table environment, and also expose processing time column in the table.

For a regular datastream, I have being doing 'tableEnvironment.createTemporaryView(path, dataStream, 'field1,field2, ..,__processing_time_column.proctime')'. with no issue. But for this retract stream, I was getting an error "org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type."

Digging a little bit deeper, in TypeInfoUtils#extractFieldInformation, it doesn't handle CRowTypeInfo as a known case. Looking at the behavior of 

Since it's a standard CompositeType, instead of only handling 'if (inputType instanceof PojoTypeInfo)', can we just add CRowTypeInfo here too? Is there any risk that I'm not aware of?

Thank you!

signature.asc (849 bytes) Download Attachment