Hi,
Experimenting with the StreamTableEnvironment I build something like this:
Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this
So far so good. Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro. So I have a Measurement.avdl that (among other things) contains something like this:
Now because the registerDataStream call can also derive the schema from the provided data I can do this:
This is very nice because any real schema is big (few hundred columns) and changes over time. Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error
So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. How do I do that? -- Best regards / Met vriendelijke groeten, Niels Basjes |
Hi Niels,
if you are coming from DataStream
API, all you need to do is to write a timestamp extractor.
When you call:
tableEnv.registerDataStream("TestStream",
letterStream, "EventTime.rowtime, letter, counter");
The ".rowtime" means that the
framework will extract the rowtime from the stream record
timestamp. You don't need to name all fields again but could
simply construct a string from
letterStream.getTypeInfo().getFieldNames(). I hope we can
improve this further in the future as part of FLIP-37.
Regards,
TimoAm 14.08.19 um 17:00 schrieb Niels
Basjes:
|
Hi, It has taken me quite a bit of time to figure this out. This is the solution I have now (works on my machine). Please tell me where I can improve this. Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure. With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement. What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field. So I have DataStream<Measurement> inputStream = ... then I register the stream with TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class); Now after the actual SQL has been executed I have a Table resultTable = ... Now simply feeding this into a DataStream with something like this fails badly. TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes()); will result in org.apache.flink.table.api.TableException: The time indicator type is an internal type only. Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo So I have to do it this way (NASTY!): final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes(); Which gives me the desired DataStream. Niels Basjes On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels, I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table. This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types. Best, Fabian Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <[hidden email]>:
|
Hi. Can you give me an example of the actual syntax of such a cast? On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <[hidden email]> wrote:
|
Hi, that would be regular SQL cast syntax: SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ... Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes <[hidden email]>:
|
Free forum by Nabble | Edit this page |