Hi All, I am using Flink1.12, I am trying to read realtime data from Kafka topic and as per the requirement I need to implement windowing LAG function. Approach I followed is below: DataStream vData = env.addSource(...) vData.keyBy(Id) createTemperoryView then apply flink sql. My sample data is like below, vTime field contains the timestamp when the even was generated and vNumSeq is the unique number for particular group Id. <img src="blob:https://mail.yahoo.com/c280f03a-8b8d-4226-843a-c4421e89e7e9" alt="" class="yahoo-inline-image" style="width: 100%; max-width: 418px;"> I tried Lag function by ordering by vSeq field (long datatype), Job failed with "OVER windows' ordering in stream mode must be defined on a time attribute". I even tried by using vTime field (eventTS is also long datatype). I tried converting this field to sql.Timestamp, still no luck Job failed with above error. When I referred few documents solution provided was to use proctime/rowtime. So I modified the query to use proctime() Job succeeded but with wrong results. Kindly help with simple example badly stuck. I am ok to use even Datastream API to implement lag functionality. Lag Query: select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY vdata.f0 ORDER BY proctime()) AS prevSal from VData vData Wrong output : <img src="blob:https://mail.yahoo.com/030224d0-175e-436b-9da7-d0d4acdee544" alt="" class="yahoo-inline-image" style="width: 100%; max-width: 482px;"> Expected: <img src="blob:https://mail.yahoo.com/914d724b-5121-4aaa-877b-38a81a429638" alt="" class="yahoo-inline-image" style="width: 100%; max-width: 482px;"> Regards, Sunitha. |
Hi, I can't see neither wrong nor expected output in your message, can you re-attach it? Could you provide the code of your pipeline including the view creation? Are you using Blink planner (can be chosen by useBlinkPlanner [1])? [1] Regards,
Roman
|
Free forum by Nabble | Edit this page |