assign time attribute after first window group when using Flink SQL

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

assign time attribute after first window group when using Flink SQL

Ivan Wang

Hi all,

 

I'd like to use 2 window group in a chain in my program as below.

 

Table myTable = cTable
        .window(Tumble.over(
"15.seconds").on("timeMill").as("w1"))
        .groupBy(
"symbol, w1").select("w1.start as start, w1.end as end, symbol, price.max as p_max, price.min as p_min")
        .window(Slide.over(
"150.rows").every("1.rows").on("start").as("w2"))
        .groupBy(
"symbol, w2").select("w2.start, w2.end, symbol, p_max.max, p_min.min")
       
;

 

 

However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows, 1.rows) is invalid: Sliding window expects a time attribute for grouping in a stream environment.

         at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:149)

         at org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:658)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1159)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1179)

         at minno.gundam.ReadPattern.main(ReadPattern.java:156)

 

Is there any way to assign time attribute after the first groupBy (w1)?

 

Thanks

Ivan

 


Reply | Threaded
Open this post in threaded view
|

Re: assign time attribute after first window group when using Flink SQL

Fabian Hueske-2
Sorry, I forgot to CC the user mailing list in my reply.

2018-04-12 17:27 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

Assuming you are using event time, the right function to generate a row time attribute from a window would be "w1.rowtime" instead of "w1.start".

The reason why Flink is picky about this is that we must ensure that the result rows of the windows are aligned with the watermarks of the stream. 

Best, Fabian


Ivan Wang <[hidden email]> schrieb am So., 8. Apr. 2018, 22:26:

Hi all,

 

I'd like to use 2 window group in a chain in my program as below.

 

Table myTable = cTable
        .window(Tumble.over(
"15.seconds").on("timeMill").as("w1"))
        .groupBy(
"symbol, w1").select("w1.start as start, w1.end as end, symbol, price.max as p_max, price.min as p_min")
        .window(Slide.over(
"150.rows").every("1.rows").on("start").as("w2"))
        .groupBy(
"symbol, w2").select("w2.start, w2.end, symbol, p_max.max, p_min.min")
       
;

 

 

However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows, 1.rows) is invalid: Sliding window expects a time attribute for grouping in a stream environment.

         at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:149)

         at org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:658)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1159)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1179)

         at minno.gundam.ReadPattern.main(ReadPattern.java:156)

 

Is there any way to assign time attribute after the first groupBy (w1)?

 

Thanks

Ivan

 



Reply | Threaded
Open this post in threaded view
|

Re: assign time attribute after first window group when using Flink SQL

Ivan Wang
Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SlidingGroupWindow('w2, 'end, 150.rows, 1.rows) is invalid: Event-time grouping windows on row intervals in a stream environment are currently not supported.

Then I tried to OverWindows, luckily it can serve my requirement as well. Now my table query is like below

.window(Tumble.over("15.seconds").on("timeMill").as("w1"))
.groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max as p_max, price.min as p_min")
.window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
.select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min over w2 as min");

It works and I can get what I want. However, the result is not ordered by the rowtime (here I use "end" as alias). Is this by default and any thing to get it ordered? 

Below is the entire requirement, 

Basically there's one raw stream (r1), and I group it first by time as w1 then by window count as w2. I'd like to compare the "price" field in every raw event with the same field in the most close preceding event in w2. 
If condition meets, I'd like to use the price value and timestamp in that event to get one matching event from another raw stream (r2). 

CEP sounds to be a good idea. But I need to refer to event in other stream (r2) in current pattern condition (r1). Is it possible to do this using CEP?

Thanks
Ivan



On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske <[hidden email]> wrote:
Sorry, I forgot to CC the user mailing list in my reply.

2018-04-12 17:27 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

Assuming you are using event time, the right function to generate a row time attribute from a window would be "w1.rowtime" instead of "w1.start".

The reason why Flink is picky about this is that we must ensure that the result rows of the windows are aligned with the watermarks of the stream. 

Best, Fabian


Ivan Wang <[hidden email]> schrieb am So., 8. Apr. 2018, 22:26:

Hi all,

 

I'd like to use 2 window group in a chain in my program as below.

 

Table myTable = cTable
        .window(Tumble.over(
"15.seconds").on("timeMill").as("w1"))
        .groupBy(
"symbol, w1").select("w1.start as start, w1.end as end, symbol, price.max as p_max, price.min as p_min")
        .window(Slide.over(
"150.rows").every("1.rows").on("start").as("w2"))
        .groupBy(
"symbol, w2").select("w2.start, w2.end, symbol, p_max.max, p_min.min")
       
;

 

 

However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows, 1.rows) is invalid: Sliding window expects a time attribute for grouping in a stream environment.

         at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:149)

         at org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:658)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1159)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1179)

         at minno.gundam.ReadPattern.main(ReadPattern.java:156)

 

Is there any way to assign time attribute after the first groupBy (w1)?

 

Thanks

Ivan

 




Reply | Threaded
Open this post in threaded view
|

Re: assign time attribute after first window group when using Flink SQL

Fabian Hueske-2
This sounds like a windowed join between the raw stream and the aggregated stream.
It might be possible to do the "lookup" in the second raw stream with another windowed join. If not, you can fall back to the DataStream API / ProcessFunction and implement the lookup logic as you need it.

Best, Fabian

2018-04-18 3:03 GMT+02:00 Ivan Wang <[hidden email]>:
Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SlidingGroupWindow('w2, 'end, 150.rows, 1.rows) is invalid: Event-time grouping windows on row intervals in a stream environment are currently not supported.

Then I tried to OverWindows, luckily it can serve my requirement as well. Now my table query is like below

.window(Tumble.over("15.seconds").on("timeMill").as("w1"))
.groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max as p_max, price.min as p_min")
.window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
.select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min over w2 as min");

It works and I can get what I want. However, the result is not ordered by the rowtime (here I use "end" as alias). Is this by default and any thing to get it ordered? 

Below is the entire requirement, 

Basically there's one raw stream (r1), and I group it first by time as w1 then by window count as w2. I'd like to compare the "price" field in every raw event with the same field in the most close preceding event in w2. 
If condition meets, I'd like to use the price value and timestamp in that event to get one matching event from another raw stream (r2). 

CEP sounds to be a good idea. But I need to refer to event in other stream (r2) in current pattern condition (r1). Is it possible to do this using CEP?

Thanks
Ivan



On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske <[hidden email]> wrote:
Sorry, I forgot to CC the user mailing list in my reply.

2018-04-12 17:27 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

Assuming you are using event time, the right function to generate a row time attribute from a window would be "w1.rowtime" instead of "w1.start".

The reason why Flink is picky about this is that we must ensure that the result rows of the windows are aligned with the watermarks of the stream. 

Best, Fabian


Ivan Wang <[hidden email]> schrieb am So., 8. Apr. 2018, 22:26:

Hi all,

 

I'd like to use 2 window group in a chain in my program as below.

 

Table myTable = cTable
        .window(Tumble.over(
"15.seconds").on("timeMill").as("w1"))
        .groupBy(
"symbol, w1").select("w1.start as start, w1.end as end, symbol, price.max as p_max, price.min as p_min")
        .window(Slide.over(
"150.rows").every("1.rows").on("start").as("w2"))
        .groupBy(
"symbol, w2").select("w2.start, w2.end, symbol, p_max.max, p_min.min")
       
;

 

 

However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows, 1.rows) is invalid: Sliding window expects a time attribute for grouping in a stream environment.

         at org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:149)

         at org.apache.flink.table.plan.logical.WindowAggregate.validate(operators.scala:658)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1159)

         at org.apache.flink.table.api.WindowGroupedTable.select(table.scala:1179)

         at minno.gundam.ReadPattern.main(ReadPattern.java:156)

 

Is there any way to assign time attribute after the first groupBy (w1)?

 

Thanks

Ivan