Flink Filters have state?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Filters have state?

Timothy Victor
I have a FilterFunction implementation which accepts an argument in its constructor which it stores as an instance member.    For example:

class ThresholdFilter implements FilterFunction  {

  private final MyThreshold threshold;

  private int numElementsSeen;

  public ThresholdFilter(MyThreshold threshold) {
    this.threshold = threshold;
  }

  <more code>

}

The filter uses the threshold in deciding whether or not to filter the incoming element.

All this works but I have some gaps in my understanding.

1.   How is this filter stored and recovered in the case of a failure.   Is it just serialized to a POJO and stored in the configured state backend?

2.  When recovered will it maintain the state of all members (e.g. note that I have a numElementsSeen member in the filter which will keep incrementi for each element recevied).

3.  Is this sort of thing even advisable for a filter?  I'm guessing Filters are meant to be reusable across operator instances.  In which case the state could be wrong after recovery?

Thanks in advance

Tim
Reply | Threaded
Open this post in threaded view
|

Re: Flink Filters have state?

Till Rohrmann
Hi Tim,

1. The filter is stored within the JobGraph which is persisted to a persistent storage if HA is enabled. Usually, this is either HDFS, S3 or any other highly available file system. It is just a serialized POJO. If you want to store your filter's state you would need to use Flink's state API [1].
2. Unless you use Flink's state API, Flink won't be able to recover the numElementsSeen field.
3. I think stateful filters are ok to use if your filter needs to be stateful. Statefulness usually complicates things so if your function can be stateless, then I would recommend to make it stateless. However, there are some applications which strictly require statefulness.


Cheers,
Till

On Thu, Nov 7, 2019 at 2:11 PM Timothy Victor <[hidden email]> wrote:
I have a FilterFunction implementation which accepts an argument in its constructor which it stores as an instance member.    For example:

class ThresholdFilter implements FilterFunction  {

  private final MyThreshold threshold;

  private int numElementsSeen;

  public ThresholdFilter(MyThreshold threshold) {
    this.threshold = threshold;
  }

  <more code>

}

The filter uses the threshold in deciding whether or not to filter the incoming element.

All this works but I have some gaps in my understanding.

1.   How is this filter stored and recovered in the case of a failure.   Is it just serialized to a POJO and stored in the configured state backend?

2.  When recovered will it maintain the state of all members (e.g. note that I have a numElementsSeen member in the filter which will keep incrementi for each element recevied).

3.  Is this sort of thing even advisable for a filter?  I'm guessing Filters are meant to be reusable across operator instances.  In which case the state could be wrong after recovery?

Thanks in advance

Tim
Reply | Threaded
Open this post in threaded view
|

Re: Flink Filters have state?

Timothy Victor
Thanks Till!   This was helpful!

Tim

On Fri, Nov 8, 2019, 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Tim,

1. The filter is stored within the JobGraph which is persisted to a persistent storage if HA is enabled. Usually, this is either HDFS, S3 or any other highly available file system. It is just a serialized POJO. If you want to store your filter's state you would need to use Flink's state API [1].
2. Unless you use Flink's state API, Flink won't be able to recover the numElementsSeen field.
3. I think stateful filters are ok to use if your filter needs to be stateful. Statefulness usually complicates things so if your function can be stateless, then I would recommend to make it stateless. However, there are some applications which strictly require statefulness.


Cheers,
Till

On Thu, Nov 7, 2019 at 2:11 PM Timothy Victor <[hidden email]> wrote:
I have a FilterFunction implementation which accepts an argument in its constructor which it stores as an instance member.    For example:

class ThresholdFilter implements FilterFunction  {

  private final MyThreshold threshold;

  private int numElementsSeen;

  public ThresholdFilter(MyThreshold threshold) {
    this.threshold = threshold;
  }

  <more code>

}

The filter uses the threshold in deciding whether or not to filter the incoming element.

All this works but I have some gaps in my understanding.

1.   How is this filter stored and recovered in the case of a failure.   Is it just serialized to a POJO and stored in the configured state backend?

2.  When recovered will it maintain the state of all members (e.g. note that I have a numElementsSeen member in the filter which will keep incrementi for each element recevied).

3.  Is this sort of thing even advisable for a filter?  I'm guessing Filters are meant to be reusable across operator instances.  In which case the state could be wrong after recovery?

Thanks in advance

Tim