Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

Niels Basjes
Hi,

Experimenting with the StreamTableEnvironment I build something like this:

DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this

TUMBLE_START(eventTime, INTERVAL '1' MINUTE)      

So far so good.

Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.

So I have a Measurement.avdl that (among other things) contains something like this:

record Measurement {
   /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */
    long                        timestamp;
    string                      letter;
    long                        pageviews;
}

Now because the registerDataStream call can also derive the schema from the provided data I can do this:

DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);

This is very nice because any real schema is big (few hundred columns) and changes over time.

Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error

Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. 
How do I do that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

Timo Walther
Hi Niels,

if you are coming from DataStream API, all you need to do is to write a timestamp extractor.

When you call:

tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

The ".rowtime" means that the framework will extract the rowtime from the stream record timestamp. You don't need to name all fields again but could simply construct a string from letterStream.getTypeInfo().getFieldNames(). I hope we can improve this further in the future as part of FLIP-37.

Regards,
Timo

Am 14.08.19 um 17:00 schrieb Niels Basjes:
Hi,

Experimenting with the StreamTableEnvironment I build something like this:

DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this

TUMBLE_START(eventTime, INTERVAL '1' MINUTE)      

So far so good.

Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.

So I have a Measurement.avdl that (among other things) contains something like this:

record Measurement {
   /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */
    long                        timestamp;
    string                      letter;
    long                        pageviews;
}

Now because the registerDataStream call can also derive the schema from the provided data I can do this:

DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);

This is very nice because any real schema is big (few hundred columns) and changes over time.

Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error

Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. 
How do I do that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes


Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

Niels Basjes
Hi,

It has taken me quite a bit of time to figure this out.
This is the solution I have now (works on my machine).

Please tell me where I can improve this.

Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure.
With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.

What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.

So I have 
    DataStream<Measurement> inputStream = ...

then I register the stream with

    TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class);
String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);

List<String> rootSchema = new ArrayList<>();
for (String fieldName: fieldNames) {
if (rowtimeFieldName.equals(fieldName)) {
rootSchema.add(fieldName + ".rowtime");
} else {
rootSchema.add(fieldName);
}
}

tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));

Now after the actual SQL has been executed I have a 
    Table resultTable = ...
Now simply feeding this into a DataStream with something like this fails badly.
    TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes());
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in
    org.apache.flink.table.api.TableException: The time indicator type is an internal type only.
       at org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo

So I have to do it this way (NASTY!):
    final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes();
int index;
for(index = 0 ; index < fieldTypes.length ; index++) {
if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
fieldTypes[index] = SQL_TIMESTAMP;
}
}
TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.


Niels Basjes





On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <[hidden email]> wrote:
Hi Niels,

if you are coming from DataStream API, all you need to do is to write a timestamp extractor.

When you call:

tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

The ".rowtime" means that the framework will extract the rowtime from the stream record timestamp. You don't need to name all fields again but could simply construct a string from letterStream.getTypeInfo().getFieldNames(). I hope we can improve this further in the future as part of FLIP-37.

Regards,
Timo

Am 14.08.19 um 17:00 schrieb Niels Basjes:
Hi,

Experimenting with the StreamTableEnvironment I build something like this:

DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this

TUMBLE_START(eventTime, INTERVAL '1' MINUTE)      

So far so good.

Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.

So I have a Measurement.avdl that (among other things) contains something like this:

record Measurement {
   /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */
    long                        timestamp;
    string                      letter;
    long                        pageviews;
}

Now because the registerDataStream call can also derive the schema from the provided data I can do this:

DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);

This is very nice because any real schema is big (few hundred columns) and changes over time.

Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error

Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. 
How do I do that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

Fabian Hueske-2
Hi Niels,

I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types.

Best, Fabian

Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <[hidden email]>:
Hi,

It has taken me quite a bit of time to figure this out.
This is the solution I have now (works on my machine).

Please tell me where I can improve this.

Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure.
With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.

What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.

So I have 
    DataStream<Measurement> inputStream = ...

then I register the stream with

    TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class);
String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);

List<String> rootSchema = new ArrayList<>();
for (String fieldName: fieldNames) {
if (rowtimeFieldName.equals(fieldName)) {
rootSchema.add(fieldName + ".rowtime");
} else {
rootSchema.add(fieldName);
}
}

tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));

Now after the actual SQL has been executed I have a 
    Table resultTable = ...
Now simply feeding this into a DataStream with something like this fails badly.
    TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes());
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in
    org.apache.flink.table.api.TableException: The time indicator type is an internal type only.
       at org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo

So I have to do it this way (NASTY!):
    final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes();
int index;
for(index = 0 ; index < fieldTypes.length ; index++) {
if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
fieldTypes[index] = SQL_TIMESTAMP;
}
}
TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.


Niels Basjes





On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <[hidden email]> wrote:
Hi Niels,

if you are coming from DataStream API, all you need to do is to write a timestamp extractor.

When you call:

tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

The ".rowtime" means that the framework will extract the rowtime from the stream record timestamp. You don't need to name all fields again but could simply construct a string from letterStream.getTypeInfo().getFieldNames(). I hope we can improve this further in the future as part of FLIP-37.

Regards,
Timo

Am 14.08.19 um 17:00 schrieb Niels Basjes:
Hi,

Experimenting with the StreamTableEnvironment I build something like this:

DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this

TUMBLE_START(eventTime, INTERVAL '1' MINUTE)      

So far so good.

Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.

So I have a Measurement.avdl that (among other things) contains something like this:

record Measurement {
   /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */
    long                        timestamp;
    string                      letter;
    long                        pageviews;
}

Now because the registerDataStream call can also derive the schema from the provided data I can do this:

DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);

This is very nice because any real schema is big (few hundred columns) and changes over time.

Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error

Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. 
How do I do that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

Niels Basjes
Hi.

Can you give me an example of the actual syntax of such a cast?

On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <[hidden email]> wrote:
Hi Niels,

I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types.

Best, Fabian

Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <[hidden email]>:
Hi,

It has taken me quite a bit of time to figure this out.
This is the solution I have now (works on my machine).

Please tell me where I can improve this.

Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure.
With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.

What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.

So I have 
    DataStream<Measurement> inputStream = ...

then I register the stream with

    TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class);
String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);

List<String> rootSchema = new ArrayList<>();
for (String fieldName: fieldNames) {
if (rowtimeFieldName.equals(fieldName)) {
rootSchema.add(fieldName + ".rowtime");
} else {
rootSchema.add(fieldName);
}
}

tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));

Now after the actual SQL has been executed I have a 
    Table resultTable = ...
Now simply feeding this into a DataStream with something like this fails badly.
    TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes());
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in
    org.apache.flink.table.api.TableException: The time indicator type is an internal type only.
       at org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo

So I have to do it this way (NASTY!):
    final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes();
int index;
for(index = 0 ; index < fieldTypes.length ; index++) {
if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
fieldTypes[index] = SQL_TIMESTAMP;
}
}
TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.


Niels Basjes





On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <[hidden email]> wrote:
Hi Niels,

if you are coming from DataStream API, all you need to do is to write a timestamp extractor.

When you call:

tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

The ".rowtime" means that the framework will extract the rowtime from the stream record timestamp. You don't need to name all fields again but could simply construct a string from letterStream.getTypeInfo().getFieldNames(). I hope we can improve this further in the future as part of FLIP-37.

Regards,
Timo

Am 14.08.19 um 17:00 schrieb Niels Basjes:
Hi,

Experimenting with the StreamTableEnvironment I build something like this:

DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this

TUMBLE_START(eventTime, INTERVAL '1' MINUTE)      

So far so good.

Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.

So I have a Measurement.avdl that (among other things) contains something like this:

record Measurement {
   /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */
    long                        timestamp;
    string                      letter;
    long                        pageviews;
}

Now because the registerDataStream call can also derive the schema from the provided data I can do this:

DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);

This is very nice because any real schema is big (few hundred columns) and changes over time.

Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error

Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. 
How do I do that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

Fabian Hueske-2
Hi,

that would be regular SQL cast syntax:

SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ...


Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes <[hidden email]>:
Hi.

Can you give me an example of the actual syntax of such a cast?

On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <[hidden email]> wrote:
Hi Niels,

I think (not 100% sure) you could also cast the event time attribute to TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output types.

Best, Fabian

Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <[hidden email]>:
Hi,

It has taken me quite a bit of time to figure this out.
This is the solution I have now (works on my machine).

Please tell me where I can improve this.

Turns out that the schema you provide for registerDataStream only needs the 'top level' fields of the Avro datastructure.
With only the top fields there you can still access nested fields with something like "topfield.x.y.z" in the SQL statement.

What I found is that the easiest way to make this all work is to ensure the rowtime field in the structure is at the top level (which makes sense in general) and generate the fields string where I only need to know the name of the "rowtime" field.

So I have 
    DataStream<Measurement> inputStream = ...

then I register the stream with

    TypeInformation<Measurement> typeInformation = TypeInformation.of(Measurement.class);
String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);

List<String> rootSchema = new ArrayList<>();
for (String fieldName: fieldNames) {
if (rowtimeFieldName.equals(fieldName)) {
rootSchema.add(fieldName + ".rowtime");
} else {
rootSchema.add(fieldName);
}
}

tableEnv.registerDataStream("MeasurementStream", inputStream, String.join(",", rootSchema));

Now after the actual SQL has been executed I have a 
    Table resultTable = ...
Now simply feeding this into a DataStream with something like this fails badly.
    TypeInformation<Row> tupleType = new RowTypeInfo(resultTable.getSchema().getFieldTypes());
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
will result in
    org.apache.flink.table.api.TableException: The time indicator type is an internal type only.
       at org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
Turns out that the schema of the output contains a field that was created by TUMBLE_START which is of type TimeIndicatorTypeInfo

So I have to do it this way (NASTY!):
    final TypeInformation<?>[] fieldTypes = resultTable.getSchema().getFieldTypes();
int index;
for(index = 0 ; index < fieldTypes.length ; index++) {
if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
fieldTypes[index] = SQL_TIMESTAMP;
}
}
TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
Which gives me the desired DataStream.


Niels Basjes





On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <[hidden email]> wrote:
Hi Niels,

if you are coming from DataStream API, all you need to do is to write a timestamp extractor.

When you call:

tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

The ".rowtime" means that the framework will extract the rowtime from the stream record timestamp. You don't need to name all fields again but could simply construct a string from letterStream.getTypeInfo().getFieldNames(). I hope we can improve this further in the future as part of FLIP-37.

Regards,
Timo

Am 14.08.19 um 17:00 schrieb Niels Basjes:
Hi,

Experimenting with the StreamTableEnvironment I build something like this:

DataStream<Tuple3<Long, String, Long>> letterStream = ...
tableEnv.registerDataStream("TestStream", letterStream, "EventTime.rowtime, letter, counter");

Because the "EventTime" was tagged with ".rowtime" it is now being used as the rowtime and has the DATETIME so I can do this

TUMBLE_START(eventTime, INTERVAL '1' MINUTE)      

So far so good.

Working towards a more realistic scenario I have a source that produces a stream of records that have been defined using Apache Avro.

So I have a Measurement.avdl that (among other things) contains something like this:

record Measurement {
   /** The time (epoch in milliseconds since 1970-01-01 UTC) when the event occurred */
    long                        timestamp;
    string                      letter;
    long                        pageviews;
}

Now because the registerDataStream call can also derive the schema from the provided data I can do this:

DataStream<Measurement> inputStream = ...
tableEnv.registerDataStream("DataStream", inputStream);

This is very nice because any real schema is big (few hundred columns) and changes over time.

Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a consequence I get this error

Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

So far I have now yet figured how to make the system understand that the timestamp column show be treated as the rowtime. 
How do I do that?

--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes