Hello,
I'm new to Flink and I need some advicees regarding the best approach to do the following: - read some items from a Kafka topic - on Flink stream side, after some simple filtering steps, group these items in batches by flink processing time. - insert the items in a PostgreSql database using a batch insert. I did this by using a time window of 1 second and added a custom sink which collects items in a blocking queue. Additionally I need to have a separate thread which triggers the commit to the database after some time, smaller than window's time. The solution works, but i am not very pleased with it because it looks very complicated for a simple batching items task. Is there any way to trigger the commit directly when the window is closed? I didn't find any solution to get notified when the window is completed. I would like to get rid of this separate thread only for triggering the batch insert. Any other possible solution would be highly appreciated. :) Thanks |
You can specify a custom trigger that extends the default ProcessingTimeTrigger (if you are working with processing time) or EventTimeTrigger (if you are working with event time).
You do it like this: stream.timeWindow(Time.of(1, SECONDS)).trigger(new MyTrigger()) Check out the Trigger implementations to get the behaviour you want ([1], [2], [3]). @Kostas: Is there another way? – Ufuk [1] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java [2] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java [3] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java On 11 November 2016 at 14:20:38, criss ([hidden email]) wrote: > Hello, > > I'm new to Flink and I need some advicees regarding the best approach to do > the following: > - read some items from a Kafka topic > - on Flink stream side, after some simple filtering steps, group these items > in batches by flink processing time. > - insert the items in a PostgreSql database using a batch insert. > > I did this by using a time window of 1 second and added a custom sink which > collects items in a blocking queue. Additionally I need to have a separate > thread which triggers the commit to the database after some time, smaller > than window's time. > > The solution works, but i am not very pleased with it because it looks very > complicated for a simple batching items task. > > Is there any way to trigger the commit directly when the window is closed? I > didn't find any solution to get notified when the window is completed. I > would like to get rid of this separate thread only for triggering the batch > insert. > > Any other possible solution would be highly appreciated. :) > Thanks > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Stream-to-Database-batch-inserts-tp10036.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Free forum by Nabble | Edit this page |