Hi -
I’m streaming events from Kafka, processing in EventTime. I’d like to process only events that are older (before) some given time (say, 2 days ago) at an interval of 5 minutes. I’ve experimented with Flink DynamicTables: String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, INTERVAL ‘5' MINUTE) as endT " + But this ignores events that are older than 5 minutes. Here’s my timestamp assigner: public class TimeLagWatermarkAssigner implements AssignerWithPeriodicWatermarks<RedactionResult> { So, a couple of questions: 1. How can I get this query to recognize earlier events (before 5 minutes ago)? 2. Is using Dynamic Table a good solution, or could I accomplish the same thing using DataStream windowing? Thanks - — Cindy
|
This is close:
String query = "SELECT pid, status, lastTry " + But I need to have a stream/table that will dynamically update every 30 seconds with only events that were not in the last query. On 2019/12/19 16:21:28, Cindy McMullen <c...@oracle.com> wrote: > Hi -> > > I’m streaming events from Kafka, processing in EventTime. I’d like to process only events that are older (before) some given time (say, 2 days ago) at an interval of 5 minutes. I’ve experimented with Flink DynamicTables:> > > String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, INTERVAL ‘5' MINUTE) as endT " +> > " FROM " + rawTable +> > " WHERE status=‘RETRY'" +> > " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime, INTERVAL ‘5' MINUTE)";> > > > But this ignores events that are older than 5 minutes. Here’s my timestamp assigner:> > public class TimeLagWatermarkAssigner implements AssignerWithPeriodicWatermarks<RedactionResult> {> > > private final long maxTimeLag = 2000; // 2 seconds> > > @Override> > public long extractTimestamp(RedactionResult rr, long previousElementTimestamp) {> > return rr.getLastTry().getTime();> > }> > > @Override> > public Watermark getCurrentWatermark() {> > return new Watermark(System.currentTimeMillis() - maxTimeLag);> > }> > }> > So, a couple of questions:> > > 1. How can I get this query to recognize earlier events (before 5 minutes ago)?> > 2. Is using Dynamic Table a good solution, or could I accomplish the same thing using DataStream windowing?> > > Thanks -> > > — Cindy> |
In reply to this post by Cindy McMullen
Never mind. Flink docs state that the query is an append, not an update, so the query is working as expected.
A better solution is something along the lines of this: String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) from " + rawTable + which can be modified to select on desired fields. On 2019/12/19 16:21:28, Cindy McMullen <c...@oracle.com> wrote: > Hi -> > > I’m streaming events from Kafka, processing in EventTime. I’d like to process only events that are older (before) some given time (say, 2 days ago) at an interval of 5 minutes. I’ve experimented with Flink DynamicTables:> > > String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, INTERVAL ‘5' MINUTE) as endT " +> > " FROM " + rawTable +> > " WHERE status=‘RETRY'" +> > " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime, INTERVAL ‘5' MINUTE)";> > > > But this ignores events that are older than 5 minutes. Here’s my timestamp assigner:> > public class TimeLagWatermarkAssigner implements AssignerWithPeriodicWatermarks<RedactionResult> {> > > private final long maxTimeLag = 2000; // 2 seconds> > > @Override> > public long extractTimestamp(RedactionResult rr, long previousElementTimestamp) {> > return rr.getLastTry().getTime();> > }> > > @Override> > public Watermark getCurrentWatermark() {> > return new Watermark(System.currentTimeMillis() - maxTimeLag);> > }> > }> > So, a couple of questions:> > > 1. How can I get this query to recognize earlier events (before 5 minutes ago)?> > 2. Is using Dynamic Table a good solution, or could I accomplish the same thing using DataStream windowing?> > > Thanks -> > > — Cindy> |
Free forum by Nabble | Edit this page |