Making batches of small messages

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Making batches of small messages

Gwenhael Pasquiers

Hi,

 

Sorry if this was already asked.

 

For performances reasons (streaming as well as batch) I’d like to “group” messages (let’s say by batches of 1000) before sending them to my sink (kafka, but mainly ES) so that I have a smaller overhead.

 

I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of such an operation is 1. Moreover I’d need some “timeout” (send the current batch to next operator after 5s if it did not reach 1000 messages before that).

 

I could also create a flatMap “String to List<String>” that cumulates messages until it reaches 1000 and then sends them to output, however that does not solve the timeout issue (not sure I could call out.collect() from a Timer thread), and even more importantly I’m afraid that that would screw up the exactly-once policy (flink could not know that I was stacking messages, I could very well be filtering them) in case of a crash.

 

My Sink could also create the chunks, with it’s own timer / counter, but I’m also afraid that it would bread the exactly-once thingie since in case of crash there is no way that flink would know if the message was really sent or stacked …

 

Is there a proper way to do what I want ?

 

Thanks in advance,

 

Gwenhaël PASQUIERS

Reply | Threaded
Open this post in threaded view
|

Re: Making batches of small messages

Fabian Hueske-2
Hi,

I think this is a case for the ProcessFunction that was recently added and will be included in Flink 1.2.
ProcessFunction allows to register timers (so the 5 secs timeout can be addressed). You can maintain the fault tolerance guarantees if you collect the records in managed state. That way they will be included in checkpoints and restored in case of a failure.

If you are on Flink 1.1.x, you will need to implement a custom operator which is a much more low-level interface.

Best, Fabian

2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

Sorry if this was already asked.

 

For performances reasons (streaming as well as batch) I’d like to “group” messages (let’s say by batches of 1000) before sending them to my sink (kafka, but mainly ES) so that I have a smaller overhead.

 

I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of such an operation is 1. Moreover I’d need some “timeout” (send the current batch to next operator after 5s if it did not reach 1000 messages before that).

 

I could also create a flatMap “String to List<String>” that cumulates messages until it reaches 1000 and then sends them to output, however that does not solve the timeout issue (not sure I could call out.collect() from a Timer thread), and even more importantly I’m afraid that that would screw up the exactly-once policy (flink could not know that I was stacking messages, I could very well be filtering them) in case of a crash.

 

My Sink could also create the chunks, with it’s own timer / counter, but I’m also afraid that it would bread the exactly-once thingie since in case of crash there is no way that flink would know if the message was really sent or stacked …

 

Is there a proper way to do what I want ?

 

Thanks in advance,

 

Gwenhaël PASQUIERS


Reply | Threaded
Open this post in threaded view
|

RE: Making batches of small messages

Gwenhael Pasquiers

Thanks,

 

We are waiting for the 1.2 release eagerly J

 

 

From: Fabian Hueske [mailto:[hidden email]]
Sent: mercredi 11 janvier 2017 18:32
To: [hidden email]
Subject: Re: Making batches of small messages

 

Hi,

I think this is a case for the ProcessFunction that was recently added and will be included in Flink 1.2.

ProcessFunction allows to register timers (so the 5 secs timeout can be addressed). You can maintain the fault tolerance guarantees if you collect the records in managed state. That way they will be included in checkpoints and restored in case of a failure.

If you are on Flink 1.1.x, you will need to implement a custom operator which is a much more low-level interface.

Best, Fabian

 

2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:

Hi,

 

Sorry if this was already asked.

 

For performances reasons (streaming as well as batch) I’d like to “group” messages (let’s say by batches of 1000) before sending them to my sink (kafka, but mainly ES) so that I have a smaller overhead.

 

I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of such an operation is 1. Moreover I’d need some “timeout” (send the current batch to next operator after 5s if it did not reach 1000 messages before that).

 

I could also create a flatMap “String to List<String>” that cumulates messages until it reaches 1000 and then sends them to output, however that does not solve the timeout issue (not sure I could call out.collect() from a Timer thread), and even more importantly I’m afraid that that would screw up the exactly-once policy (flink could not know that I was stacking messages, I could very well be filtering them) in case of a crash.

 

My Sink could also create the chunks, with it’s own timer / counter, but I’m also afraid that it would bread the exactly-once thingie since in case of crash there is no way that flink would know if the message was really sent or stacked …

 

Is there a proper way to do what I want ?

 

Thanks in advance,

 

Gwenhaël PASQUIERS

 

Reply | Threaded
Open this post in threaded view
|

Re: Making batches of small messages

Kostas Kloudas
Hi,

Fabian is right. 

The only thing I have to add is that if you have parallelism > 1 then each task 
will know its local “count” of messages it has buffered. In other words, with a parallelism of 
2 and a batching threshold of 1000 messages, each one of the parallel tasks will have to reach this
threshold before flushing to your sink. If task 0 has 501 messages and task 1 600, they will 
still not flush.

This can be resolved with your timeout but it may be worth adjusting your threshold 
according to the parallelism of your job, to avoid memory issues that may arise (depending
on the state backend you are using).

Kostas

On Jan 12, 2017, at 10:09 AM, Gwenhael Pasquiers <[hidden email]> wrote:

Thanks,
 
We are waiting for the 1.2 release eagerly J
 
 
From: Fabian Hueske [[hidden email]] 
Sent: mercredi 11 janvier 2017 18:32
To: [hidden email]
Subject: Re: Making batches of small messages
 

Hi,

I think this is a case for the ProcessFunction that was recently added and will be included in Flink 1.2.

ProcessFunction allows to register timers (so the 5 secs timeout can be addressed). You can maintain the fault tolerance guarantees if you collect the records in managed state. That way they will be included in checkpoints and restored in case of a failure.

If you are on Flink 1.1.x, you will need to implement a custom operator which is a much more low-level interface.

Best, Fabian
 
2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers <[hidden email]>:
Hi,
 
Sorry if this was already asked.
 
For performances reasons (streaming as well as batch) I’d like to “group” messages (let’s say by batches of 1000) before sending them to my sink (kafka, but mainly ES) so that I have a smaller overhead.
 
I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of such an operation is 1. Moreover I’d need some “timeout” (send the current batch to next operator after 5s if it did not reach 1000 messages before that).
 
I could also create a flatMap “String to List<String>” that cumulates messages until it reaches 1000 and then sends them to output, however that does not solve the timeout issue (not sure I could call out.collect() from a Timer thread), and even more importantly I’m afraid that that would screw up the exactly-once policy (flink could not know that I was stacking messages, I could very well be filtering them) in case of a crash.
 
My Sink could also create the chunks, with it’s own timer / counter, but I’m also afraid that it would bread the exactly-once thingie since in case of crash there is no way that flink would know if the message was really sent or stacked …
 
Is there a proper way to do what I want ?
 
Thanks in advance,
 
Gwenhaël PASQUIERS