Tumbling window expects a time attribute for grouping in a stream environment

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Tumbling window expects a time attribute for grouping in a stream environment

enrico canzonieri
Hi,
I'm trying to window and groupBy a stream using the table api, but I get ValidationException in the windowing function.
Here is the relevant code:

tableEnv.registerTableSource(schema.getName, src)
val table = tableEnv.scan(schema.getName)
val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host, 'w).select('host)


"time" is defined as Long in my schema. The error I get is:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I also tried to define a window that was using processing time, but what described in the documentation "Tumble over 1.minutes as 'w"  doesn't seem to work anymore. Specifically it seems that a window now always expects the "on" call.

Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.

thanks
Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window expects a time attribute for grouping in a stream environment

Timo Walther
Hi Enrico,

the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they will be updated in the next days/1-2 weeks.

We recently introduced so-called "time indicators". These are attributes that correspond to Flink's time and watermarks. You declare a logical field that represents Flink's internal time in a table program.

In your example you need to append a "time.rowtime" or "time.proctime" to your table schema definition.

You can find some examples here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala

If you have further question, feel free to ask them. It helps us to improve the documenation.

Regards,
Timo



Am 24.05.17 um 04:15 schrieb enrico canzonieri:
Hi,
I'm trying to window and groupBy a stream using the table api, but I get ValidationException in the windowing function.
Here is the relevant code:

tableEnv.registerTableSource(schema.getName, src)
val table = tableEnv.scan(schema.getName)
val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host, 'w).select('host)


"time" is defined as Long in my schema. The error I get is:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I also tried to define a window that was using processing time, but what described in the documentation "Tumble over 1.minutes as 'w"  doesn't seem to work anymore. Specifically it seems that a window now always expects the "on" call.

Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.

thanks


Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window expects a time attribute for grouping in a stream environment

enrico canzonieri
Hi Timo, thanks for your help!

I tried to follow the examples in the tests but I still have the same issue.
I changed my schema and added an additional field "rowtime". My schema now is:
root
 |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr: GenericType<org.apache.flink.table.expressions.Expression>)
 |-- time: Long
 |-- host: String

If I run the code:
table.select('rowtime).toDataStream[Row].print()
I get:
RowtimeAttribute(1495580133000)
RowtimeAttribute(1495580143000)
RowtimeAttribute(1495580153000)

But If I run:
table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host, 'w).select('host)
I still get the previous error:
TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I'm using a Kafka09TableSource as data source, but it doesn't allow me to specify the timestamp assigner. I think the actual consumer is not exposed to the user so I cannot really call assignTimestampsAndWatermarks. May that be the problem? Should we expose that function so that we can assign timestamp and watermark to a TableSource?

The time characteristic in the execution environment is set to EventTime in my code.

Cheers,
Enrico

On Wed, May 24, 2017 at 2:08 AM, Timo Walther <[hidden email]> wrote:
Hi Enrico,

the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they will be updated in the next days/1-2 weeks.

We recently introduced so-called "time indicators". These are attributes that correspond to Flink's time and watermarks. You declare a logical field that represents Flink's internal time in a table program.

In your example you need to append a "time.rowtime" or "time.proctime" to your table schema definition.

You can find some examples here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala

If you have further question, feel free to ask them. It helps us to improve the documenation.

Regards,
Timo



Am 24.05.17 um 04:15 schrieb enrico canzonieri:
Hi,
I'm trying to window and groupBy a stream using the table api, but I get ValidationException in the windowing function.
Here is the relevant code:

tableEnv.registerTableSource(schema.getName, src)
val table = tableEnv.scan(schema.getName)
val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host, 'w).select('host)


"time" is defined as Long in my schema. The error I get is:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I also tried to define a window that was using processing time, but what described in the documentation "Tumble over 1.minutes as 'w"  doesn't seem to work anymore. Specifically it seems that a window now always expects the "on" call.

Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.

thanks



Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window expects a time attribute for grouping in a stream environment

enrico canzonieri
I solved this implementing a new Kafka09TableSource in my application. The class I implemented extends both DefinedRowTimeAttribute and DefinedProcTimeAttribute and it exposes the consumer so that I can assign the timestamp extractor.

I'm not sure if this is the right approach, but if that's the case I wonder if we could make those changes into KafkaTableSource to make it more generic.

On Wed, May 24, 2017 at 12:23 PM, enrico canzonieri <[hidden email]> wrote:
Hi Timo, thanks for your help!

I tried to follow the examples in the tests but I still have the same issue.
I changed my schema and added an additional field "rowtime". My schema now is:
root
 |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr: GenericType<org.apache.flink.table.expressions.Expression>)
 |-- time: Long
 |-- host: String

If I run the code:
table.select('rowtime).toDataStream[Row].print()
I get:
RowtimeAttribute(1495580133000)
RowtimeAttribute(1495580143000)
RowtimeAttribute(1495580153000)

But If I run:
table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host, 'w).select('host)
I still get the previous error:
TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I'm using a Kafka09TableSource as data source, but it doesn't allow me to specify the timestamp assigner. I think the actual consumer is not exposed to the user so I cannot really call assignTimestampsAndWatermarks. May that be the problem? Should we expose that function so that we can assign timestamp and watermark to a TableSource?

The time characteristic in the execution environment is set to EventTime in my code.

Cheers,
Enrico

On Wed, May 24, 2017 at 2:08 AM, Timo Walther <[hidden email]> wrote:
Hi Enrico,

the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they will be updated in the next days/1-2 weeks.

We recently introduced so-called "time indicators". These are attributes that correspond to Flink's time and watermarks. You declare a logical field that represents Flink's internal time in a table program.

In your example you need to append a "time.rowtime" or "time.proctime" to your table schema definition.

You can find some examples here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala

If you have further question, feel free to ask them. It helps us to improve the documenation.

Regards,
Timo



Am 24.05.17 um 04:15 schrieb enrico canzonieri:
Hi,
I'm trying to window and groupBy a stream using the table api, but I get ValidationException in the windowing function.
Here is the relevant code:

tableEnv.registerTableSource(schema.getName, src)
val table = tableEnv.scan(schema.getName)
val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host, 'w).select('host)


"time" is defined as Long in my schema. The error I get is:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I also tried to define a window that was using processing time, but what described in the documentation "Tumble over 1.minutes as 'w"  doesn't seem to work anymore. Specifically it seems that a window now always expects the "on" call.

Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.

thanks




Reply | Threaded
Open this post in threaded view
|

Re: Tumbling window expects a time attribute for grouping in a stream environment

Fabian Hueske-2
Hi Enrico,

that's (for now) the right approach. I agree, that the KafkaTableSource should implement both DefinedXTimeAttribute interfaces.

Best, Fabian

2017-05-25 3:20 GMT+02:00 enrico canzonieri <[hidden email]>:
I solved this implementing a new Kafka09TableSource in my application. The class I implemented extends both DefinedRowTimeAttribute and DefinedProcTimeAttribute and it exposes the consumer so that I can assign the timestamp extractor.

I'm not sure if this is the right approach, but if that's the case I wonder if we could make those changes into KafkaTableSource to make it more generic.

On Wed, May 24, 2017 at 12:23 PM, enrico canzonieri <[hidden email]> wrote:
Hi Timo, thanks for your help!

I tried to follow the examples in the tests but I still have the same issue.
I changed my schema and added an additional field "rowtime". My schema now is:
root
 |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr: GenericType<org.apache.flink.table.expressions.Expression>)
 |-- time: Long
 |-- host: String

If I run the code:
table.select('rowtime).toDataStream[Row].print()
I get:
RowtimeAttribute(1495580133000)
RowtimeAttribute(1495580143000)
RowtimeAttribute(1495580153000)

But If I run:
table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host, 'w).select('host)
I still get the previous error:
TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I'm using a Kafka09TableSource as data source, but it doesn't allow me to specify the timestamp assigner. I think the actual consumer is not exposed to the user so I cannot really call assignTimestampsAndWatermarks. May that be the problem? Should we expose that function so that we can assign timestamp and watermark to a TableSource?

The time characteristic in the execution environment is set to EventTime in my code.

Cheers,
Enrico

On Wed, May 24, 2017 at 2:08 AM, Timo Walther <[hidden email]> wrote:
Hi Enrico,

the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they will be updated in the next days/1-2 weeks.

We recently introduced so-called "time indicators". These are attributes that correspond to Flink's time and watermarks. You declare a logical field that represents Flink's internal time in a table program.

In your example you need to append a "time.rowtime" or "time.proctime" to your table schema definition.

You can find some examples here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala

If you have further question, feel free to ask them. It helps us to improve the documenation.

Regards,
Timo



Am 24.05.17 um 04:15 schrieb enrico canzonieri:
Hi,
I'm trying to window and groupBy a stream using the table api, but I get ValidationException in the windowing function.
Here is the relevant code:

tableEnv.registerTableSource(schema.getName, src)
val table = tableEnv.scan(schema.getName)
val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host, 'w).select('host)


"time" is defined as Long in my schema. The error I get is:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

I also tried to define a window that was using processing time, but what described in the documentation "Tumble over 1.minutes as 'w"  doesn't seem to work anymore. Specifically it seems that a window now always expects the "on" call.

Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.

thanks