Querying DataStream for events before a given time

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Querying DataStream for events before a given time

Cindy McMullen
Hi -

I’m streaming events from Kafka, processing in EventTime.  I’d like to process only events that are older (before) some given time (say, 2 days ago) at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:

String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, INTERVAL 5' MINUTE) as endT " +
" FROM " + rawTable +
" WHERE status=RETRY'" +
" GROUP BY status, pid, lastTry, TUMBLE(UserActionTime, INTERVAL 5' MINUTE)";


But this ignores events that are older than 5 minutes.  Here’s my timestamp assigner:
public class TimeLagWatermarkAssigner implements AssignerWithPeriodicWatermarks<RedactionResult> {

private final long maxTimeLag = 2000; // 2 seconds

@Override
public long extractTimestamp(RedactionResult rr, long previousElementTimestamp) {
return rr.getLastTry().getTime();
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
So, a couple of questions:

1. How can I get this query to recognize earlier events (before 5 minutes ago)?
2. Is using Dynamic Table a good solution, or could I accomplish the same thing using DataStream windowing?

Thanks -

— Cindy
Reply | Threaded
Open this post in threaded view
|

Re: Querying DataStream for events before a given time

Cindy McMullen
This is close:

String query = "SELECT pid, status, lastTry " +
" FROM " + rawTable +
" WHERE status='RECOVERABLE'" +
" GROUP BY HOP(UserActionTime, INTERVAL '30' SECOND, INTERVAL '5' HOUR), pid, status, lastTry";
But I need to have a stream/table that will dynamically update every 30 seconds with only events that were not in the last query.


On 2019/12/19 16:21:28, Cindy McMullen <c...@oracle.com> wrote:

> Hi ->
>
> I’m streaming events from Kafka, processing in EventTime.  I’d like to process only events that are older (before) some given time (say, 2 days ago) at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:>
>
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, INTERVAL ‘5' MINUTE) as endT " +>
>     " FROM " + rawTable +>
>     " WHERE status=‘RETRY'" +>
>     " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' MINUTE)";>
>
>
> But this ignores events that are older than 5 minutes.  Here’s my timestamp assigner:>
> public class TimeLagWatermarkAssigner implements AssignerWithPeriodicWatermarks<RedactionResult> {>
>
>   private final long maxTimeLag = 2000; // 2 seconds>
>
>   @Override>
>   public long extractTimestamp(RedactionResult rr, long previousElementTimestamp) {>
>     return rr.getLastTry().getTime();>
>   }>
>
>   @Override>
>   public Watermark getCurrentWatermark() {>
>     return new Watermark(System.currentTimeMillis() - maxTimeLag);>
>   }>
> }>
> So, a couple of questions:>
>
> 1. How can I get this query to recognize earlier events (before 5 minutes ago)?>
> 2. Is using Dynamic Table a good solution, or could I accomplish the same thing using DataStream windowing?>
>
> Thanks ->
>
> — Cindy> 
Reply | Threaded
Open this post in threaded view
|

Re: Querying DataStream for events before a given time

Cindy McMullen
In reply to this post by Cindy McMullen
Never mind.  Flink docs state that the query is an append, not an update, so the query is working as expected.

A better solution is something along the lines of this:
String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
which can be modified to select on desired fields.

On 2019/12/19 16:21:28, Cindy McMullen <c...@oracle.com> wrote:

> Hi ->
>
> I’m streaming events from Kafka, processing in EventTime.  I’d like to process only events that are older (before) some given time (say, 2 days ago) at an interval of 5 minutes.  I’ve experimented with Flink DynamicTables:>
>
> String query = "SELECT pid, status, lastTry, TUMBLE_END(UserActionTime, INTERVAL ‘5' MINUTE) as endT " +>
>     " FROM " + rawTable +>
>     " WHERE status=‘RETRY'" +>
>     " GROUP BY status, pid, lastTry, TUMBLE(UserActionTime,  INTERVAL ‘5' MINUTE)";>
>
>
> But this ignores events that are older than 5 minutes.  Here’s my timestamp assigner:>
> public class TimeLagWatermarkAssigner implements AssignerWithPeriodicWatermarks<RedactionResult> {>
>
>   private final long maxTimeLag = 2000; // 2 seconds>
>
>   @Override>
>   public long extractTimestamp(RedactionResult rr, long previousElementTimestamp) {>
>     return rr.getLastTry().getTime();>
>   }>
>
>   @Override>
>   public Watermark getCurrentWatermark() {>
>     return new Watermark(System.currentTimeMillis() - maxTimeLag);>
>   }>
> }>
> So, a couple of questions:>
>
> 1. How can I get this query to recognize earlier events (before 5 minutes ago)?>
> 2. Is using Dynamic Table a good solution, or could I accomplish the same thing using DataStream windowing?>
>
> Thanks ->
>
> — Cindy>