Read Kafka message and keyBy by tableName, then write the message list to DataBase with batchUpdate
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());
It seems that for every record comming, the aggregate function will be trigged.
But I want to trigger only once for every window.
How can i implement this?
Thanks,
Lei