Kafka Stream to Database batch inserts

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Stream to Database batch inserts

criss
Hello,

I'm new to Flink and I need some advicees regarding the best approach to do the following:
- read some items from a Kafka topic
- on Flink stream side, after some simple filtering steps, group these items in batches by flink processing time.
- insert the items in a PostgreSql database using a batch insert.

I did this by using a time window of 1 second and added a custom sink which collects items in a blocking queue. Additionally I need to have a separate thread which triggers the commit to the database after some time, smaller than window's time.

The solution works, but i am not very pleased with it because it looks very complicated for a simple batching items task.

Is there any way to trigger the commit directly when the window is closed? I didn't find any solution to get notified when the window is completed. I would like to get rid of this separate thread only for triggering the batch insert.

Any other possible solution would be highly appreciated. :)
Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Stream to Database batch inserts

Ufuk Celebi
You can specify a custom trigger that extends the default ProcessingTimeTrigger (if you are working with processing time) or EventTimeTrigger (if you are working with event time).

You do it like this:

stream.timeWindow(Time.of(1, SECONDS)).trigger(new MyTrigger())

Check out the Trigger implementations to get the behaviour you want ([1], [2], [3]).

@Kostas: Is there another way?

– Ufuk

[1] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java

[2] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

[3] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

On 11 November 2016 at 14:20:38, criss ([hidden email]) wrote:

> Hello,
>  
> I'm new to Flink and I need some advicees regarding the best approach to do
> the following:
> - read some items from a Kafka topic
> - on Flink stream side, after some simple filtering steps, group these items
> in batches by flink processing time.
> - insert the items in a PostgreSql database using a batch insert.
>  
> I did this by using a time window of 1 second and added a custom sink which
> collects items in a blocking queue. Additionally I need to have a separate
> thread which triggers the commit to the database after some time, smaller
> than window's time.
>  
> The solution works, but i am not very pleased with it because it looks very
> complicated for a simple batching items task.
>  
> Is there any way to trigger the commit directly when the window is closed? I
> didn't find any solution to get notified when the window is completed. I
> would like to get rid of this separate thread only for triggering the batch
> insert.
>  
> Any other possible solution would be highly appreciated. :)
> Thanks
>  
>  
>  
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Stream-to-Database-batch-inserts-tp10036.html 
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.  
>  

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Stream to Database batch inserts

criss
This post was updated on .
Hi,

Thank you very much for you hint! It works pretty well.