Passing parameters to filter function (in DataStreams)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Passing parameters to filter function (in DataStreams)

Komal Mariam
Hi everyone, 

Suppose I have to compute a filter condition

Integer threshold = compute threshold();

If I:

temperatureStream.filter(new FilterFunction<temperature>() {
@Override
public boolean filter(Integer temperature) throws Exception {
Integer threshold = compute threshold();
return temperature > threshold
}


would this mean I have computed threshold over and over again, for every new element in the stream?

my threshold does not changes once it computed. I don't want to recompute it every time for new elements? is there way I can pass it as a parameter to the filter function?

Reply | Threaded
Open this post in threaded view
|

Re: Passing parameters to filter function (in DataStreams)

Chesnay Schepler
You can compute the threshold ahead of time and reference it directly in the filter function.

(Below are 2 examples, depending on whether you like lambdas or not)
final int threshold = computeThreshold();

temperatureStream.filter(new FilterFunction<Integer>() {
   @Override
   public boolean filter(Integer temperature) {
      return temperature > threshold;
   }
});
final int threshold = computeThreshold();

temperatureStream.filter(temperature -> temperature > threshold);

On 08/10/2019 12:46, Komal Mariam wrote:
Hi everyone, 

Suppose I have to compute a filter condition

Integer threshold = compute threshold();

If I:

temperatureStream.filter(new FilterFunction<temperature>() {
@Override
public boolean filter(Integer temperature) throws Exception {
Integer threshold = compute threshold();
return temperature > threshold
}


would this mean I have computed threshold over and over again, for every new element in the stream?

my threshold does not changes once it computed. I don't want to recompute it every time for new elements? is there way I can pass it as a parameter to the filter function?


Reply | Threaded
Open this post in threaded view
|

Re: Passing parameters to filter function (in DataStreams)

Komal Mariam
Thank you @Chesnay!

I also managed to pass arguments to a RichFilterFunction:  new MyFilterFunc(Integer threshold)  by defining its constructor.
If there's a better way to pass arguments I'd appreciate it if you let me know.

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler <[hidden email]> wrote:
You can compute the threshold ahead of time and reference it directly in the filter function.

(Below are 2 examples, depending on whether you like lambdas or not)
final int threshold = computeThreshold();

temperatureStream.filter(new FilterFunction<Integer>() {
   @Override
   public boolean filter(Integer temperature) {
      return temperature > threshold;
   }
});
final int threshold = computeThreshold();

temperatureStream.filter(temperature -> temperature > threshold);

On 08/10/2019 12:46, Komal Mariam wrote:
Hi everyone, 

Suppose I have to compute a filter condition

Integer threshold = compute threshold();

If I:

temperatureStream.filter(new FilterFunction<temperature>() {
@Override
public boolean filter(Integer temperature) throws Exception {
Integer threshold = compute threshold();
return temperature > threshold
}


would this mean I have computed threshold over and over again, for every new element in the stream?

my threshold does not changes once it computed. I don't want to recompute it every time for new elements? is there way I can pass it as a parameter to the filter function?


Reply | Threaded
Open this post in threaded view
|

Re: Passing parameters to filter function (in DataStreams)

Theo
Hi,

Your original post looks like "computeThreshold" doesn't require any parameters, but is just an expensive to compute operation.

In that case, you can inherit from "RichFilterFunction" instead of "FilterFunction". In case of "RichFilterFunction", you can override the "open"-method and perform your operation in there just once and store the result e.g. in a transient variable. In that case, nothing gets serialized and send over the network. The open method is guaranteed to be called only once per operator and is called before the first call to "filter" is made.

The pattern to pass arguments in general is totally fine. I often pass e.g. a connection String as a parameter to my RichFunction and within the open method of the function, I establish the connection to some remote system.

Best regards
Theo


Von: "Komal Mariam" <[hidden email]>
An: "Chesnay Schepler" <[hidden email]>
CC: "user" <[hidden email]>
Gesendet: Donnerstag, 10. Oktober 2019 04:00:46
Betreff: Re: Passing parameters to filter function (in DataStreams)

Thank you @Chesnay!

I also managed to pass arguments to a RichFilterFunction:  new MyFilterFunc(Integer threshold)  by defining its constructor.
If there's a better way to pass arguments I'd appreciate it if you let me know.

On Tue, 8 Oct 2019 at 19:58, Chesnay Schepler <[hidden email]> wrote:
You can compute the threshold ahead of time and reference it directly in the filter function.

(Below are 2 examples, depending on whether you like lambdas or not)
final int threshold = computeThreshold();

temperatureStream.filter(new FilterFunction<Integer>() {
   @Override
   public boolean filter(Integer temperature) {
      return temperature > threshold;
   }
});
final int threshold = computeThreshold();

temperatureStream.filter(temperature -> temperature > threshold);

On 08/10/2019 12:46, Komal Mariam wrote:
Hi everyone, 

Suppose I have to compute a filter condition

Integer threshold = compute threshold();

If I:

temperatureStream.filter(new FilterFunction<temperature>() {
@Override
public boolean filter(Integer temperature) throws Exception {
Integer threshold = compute threshold();
return temperature > threshold
}


would this mean I have computed threshold over and over again, for every new element in the stream?

my threshold does not changes once it computed. I don't want to recompute it every time for new elements? is there way I can pass it as a parameter to the filter function?