Datastream Lag Windowing function

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Datastream Lag Windowing function

s_penakalapati@yahoo.com
Hi All,

I am using Flink1.12, I am trying to read realtime data from Kafka topic and as per the requirement I need to implement windowing LAG function. Approach I followed is below:

DataStream vData = env.addSource(...)
vData.keyBy(Id)
createTemperoryView
then apply flink sql.

My sample data is like below, vTime field contains the timestamp when the even was generated and vNumSeq is the unique number for particular group Id.
<img src="blob:https://mail.yahoo.com/c280f03a-8b8d-4226-843a-c4421e89e7e9" alt="" class="yahoo-inline-image" style="width: 100%; max-width: 418px;">

I tried Lag function by ordering by vSeq field (long datatype), Job failed with "OVER windows' ordering in stream mode must be defined on a time attribute". 

I even tried by using vTime field (eventTS is also long datatype). I tried converting this field to sql.Timestamp, still no luck Job failed with above error.

When I referred few documents solution provided was to use proctime/rowtime. So I modified the query to use proctime() Job succeeded but with wrong results.

Kindly help with simple example badly stuck. I am ok to use even Datastream API to implement lag functionality.

Lag Query:
select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY vdata.f0 ORDER BY proctime()) AS prevSal from VData vData 

Wrong output :
<img src="blob:https://mail.yahoo.com/030224d0-175e-436b-9da7-d0d4acdee544" alt="" class="yahoo-inline-image" style="width: 100%; max-width: 482px;">


Expected:
<img src="blob:https://mail.yahoo.com/914d724b-5121-4aaa-877b-38a81a429638" alt="" class="yahoo-inline-image" style="width: 100%; max-width: 482px;">


Regards,
Sunitha.

Reply | Threaded
Open this post in threaded view
|

Re: Datastream Lag Windowing function

Roman Khachatryan
Hi,

I can't see neither wrong nor expected output in your message, can you re-attach it?
Could you provide the code of your pipeline including the view creation?
Are you using Blink planner (can be chosen by useBlinkPlanner [1])?


[1]

Regards,
Roman


On Sun, Feb 21, 2021 at 9:40 AM [hidden email] <[hidden email]> wrote:
Hi All,

I am using Flink1.12, I am trying to read realtime data from Kafka topic and as per the requirement I need to implement windowing LAG function. Approach I followed is below:

DataStream vData = env.addSource(...)
vData.keyBy(Id)
createTemperoryView
then apply flink sql.

My sample data is like below, vTime field contains the timestamp when the even was generated and vNumSeq is the unique number for particular group Id.

I tried Lag function by ordering by vSeq field (long datatype), Job failed with "OVER windows' ordering in stream mode must be defined on a time attribute". 

I even tried by using vTime field (eventTS is also long datatype). I tried converting this field to sql.Timestamp, still no luck Job failed with above error.

When I referred few documents solution provided was to use proctime/rowtime. So I modified the query to use proctime() Job succeeded but with wrong results.

Kindly help with simple example badly stuck. I am ok to use even Datastream API to implement lag functionality.

Lag Query:
select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY vdata.f0 ORDER BY proctime()) AS prevSal from VData vData 

Wrong output :


Expected:


Regards,
Sunitha.