Triggering temporal queries

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Triggering temporal queries

Cindy McMullen
This code runs and returns the correct result on the initial query, but fails to trigger as data continues to stream in on Kafka.  Is there anything obvious I’m missing?

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
tableEnv = StreamTableEnvironment.create(env);

// Consume RedactionResults from Kafka into DataStream
DataStream<RedactionResult> rrStream =
env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());
Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, status, UserActionTime.proctime");
rawTable.printSchema();

// This works on initial query, but fails to trigger afterwards.
String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";
logger.debug("Query: " + query);

Table qTable = tableEnv.sqlQuery(query);

Reply | Threaded
Open this post in threaded view
|

Re: Triggering temporal queries

Cindy McMullen
Never mind.  The code is correct; the input test data was not. All is well.

 FWIW, it’s useful while debugging to select the results of the time function itself:

String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
logger.debug("Query: " + query);

Table qTable = tableEnv.sqlQuery(query);

TupleTypeInfo<Tuple3<Timestamp,Timestamp, Integer>> typeInfoTs =
new TupleTypeInfo<>( Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.INT());
tableEnv.toAppendStream(qTable, typeInfoTs)
.process(new ProcessFunction<Tuple3<Timestamp,Timestamp, Integer>, Tuple3<Timestamp,Timestamp, Integer>>() {
@Override
public void processElement(Tuple3<Timestamp, Timestamp, Integer> t, Context context,
Collector<Tuple3<Timestamp,Timestamp, Integer>> collector) throws Exception {
logger.debug("QR: " + t);
collector.collect(t);
}
})
.addSink(new DiscardingSink<>());
env.execute();
19/12/19 17:50:37 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:32:40.58,2019-12-19 17:50:37.955,77)
19/12/19 17:50:46 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:42:40.58,2019-12-19 17:50:46.955,68)
19/12/19 17:50:55 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:52:40.58,2019-12-19 17:50:55.958,58)


On 2019/12/19 21:41:17, Cindy McMullen <c...@oracle.com> wrote:

> This code runs and returns the correct result on the initial query, but fails to trigger as data continues to stream in on Kafka.  Is there anything obvious I’m missing?>
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);>
> tableEnv = StreamTableEnvironment.create(env);>
>
> // Consume RedactionResults from Kafka into DataStream>
> DataStream<RedactionResult> rrStream =>
>     env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());>
>  Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, status, UserActionTime.proctime");>
> rawTable.printSchema();>
>
> // This works on initial query, but fails to trigger afterwards.>
> String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + rawTable +>
>     " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";>
> logger.debug("Query: " + query);>
>
> Table qTable = tableEnv.sqlQuery(query);>
>
>