This code runs and returns the correct result on the initial query, but fails to trigger as data continues to stream in on Kafka. Is there anything obvious I’m missing?
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); |
Never mind. The code is correct; the input test data was not. All is well.
FWIW, it’s useful while debugging to select the results of the time function itself: String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) from " + rawTable + 19/12/19 17:50:37 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:32:40.58,2019-12-19 17:50:37.955,77) 19/12/19 17:50:46 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:42:40.58,2019-12-19 17:50:46.955,68) 19/12/19 17:50:55 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:52:40.58,2019-12-19 17:50:55.958,58) On 2019/12/19 21:41:17, Cindy McMullen <c...@oracle.com> wrote: > This code runs and returns the correct result on the initial query, but fails to trigger as data continues to stream in on Kafka. Is there anything obvious I’m missing?> > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);> > tableEnv = StreamTableEnvironment.create(env);> > > // Consume RedactionResults from Kafka into DataStream> > DataStream<RedactionResult> rrStream => > env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());> > Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, status, UserActionTime.proctime");> > rawTable.printSchema();> > > // This works on initial query, but fails to trigger afterwards.> > String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + rawTable +> > " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";> > logger.debug("Query: " + query);> > > Table qTable = tableEnv.sqlQuery(query);> > > |
Free forum by Nabble | Edit this page |