Filter push-down not working for a custom BatchTableSource

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Filter push-down not working for a custom BatchTableSource

Josh Bradt
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that retrieves data from a REST API and returns it as POJO instances. I've implemented FilterableTableSource as described in the docs, returning a new instance of my table source containing the predicates that I've removed from the list of predicates passed into applyPredicate. However, when getDataSet is eventually called, it's called on the instance of the table source that was originally registered with the table environment, which does not have any filters in it. I've stepped through the code in a debugger, and applyPredicates is definitely being called, and it's definitely returning new instances of my table source, but they don't seem to be being used.

I also played with the OrcTableSource, which is the only example of a push-down filter implementation I could find, and it doesn't behave this way. When I set a breakpoint in getDataSet in that case, it's being called on one of the new instances of the table source that contains the accepted filters.

Are there any other requirements for implementing push-down filters that aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo
Reply | Threaded
Open this post in threaded view
|

Re: Filter push-down not working for a custom BatchTableSource

Fabian Hueske-2
Hi Josh,

Does your TableSource also implement ProjectableTableSource?
If yes, you need to make sure that the filter information is also forwarded if ProjectableTableSource.projectFields() is called after FilterableTableSource.applyPredicate().
Also make sure to correctly implement FilterableTableSource.isFilterPushedDown().

Hope this helps,
Fabian

Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <[hidden email]>:
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that retrieves data from a REST API and returns it as POJO instances. I've implemented FilterableTableSource as described in the docs, returning a new instance of my table source containing the predicates that I've removed from the list of predicates passed into applyPredicate. However, when getDataSet is eventually called, it's called on the instance of the table source that was originally registered with the table environment, which does not have any filters in it. I've stepped through the code in a debugger, and applyPredicates is definitely being called, and it's definitely returning new instances of my table source, but they don't seem to be being used.

I also played with the OrcTableSource, which is the only example of a push-down filter implementation I could find, and it doesn't behave this way. When I set a breakpoint in getDataSet in that case, it's being called on one of the new instances of the table source that contains the accepted filters.

Are there any other requirements for implementing push-down filters that aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo
Reply | Threaded
Open this post in threaded view
|

Re: Filter push-down not working for a custom BatchTableSource

Josh Bradt
Hi Fabian,

Thanks for your reply. My custom table source does not implement ProjectableTableSource. I believe that isFilterPushedDown is implemented correctly since it's nearly identical to what's written in the OrcTableSource. I pasted a slightly simplified version of the implementation below. If you wouldn't mind reading over it, is there anything obviously wrong?

public final class CustomerTableSource implements BatchTableSource<Customer>,
FilterableTableSource<Customer> {

// Iterator that gets data from a REST API as POJO instances
private final AppResourceIterator<Customer> resourceIterator;
private final String tableName;
private final Class<Customer> modelClass;
private final AppRequestFilter[] filters;

public CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass) {

this(resourceIterator, tableName, modelClass, null);
}

protected CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass,
AppRequestFilter[] filters) {

this.resourceIterator = resourceIterator;
this.tableName = tableName;
this.modelClass = modelClass;
this.filters = filters;
}

@Override
public TableSource<Customer> applyPredicate(List<Expression> predicates) {
List<Expression> acceptedPredicates = new ArrayList<>();
List<AppRequestFilter> acceptedFilters = new ArrayList<>();

for (final Expression predicate : predicates) {
buildFilterForPredicate(predicate).ifPresent(filter -> {
acceptedFilters.add(filter);
acceptedPredicates.add(predicate);
});
}

predicates.removeAll(acceptedPredicates);

return new CustomerTableSource(
resourceIterator.withFilters(acceptedFilters),
tableName,
modelClass,
acceptedFilters.toArray(new AppRequestFilter[0])
);
}

public Optional<AppRequestFilter> buildFilterForPredicate(Expression predicate) {
// Code for translating an Expression into an AppRequestFilter
// Returns Optional.empty() for predicates we don't want to / can't apply
}

@Override
public boolean isFilterPushedDown() {
return filters != null;
}

@Override
public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
return execEnv.fromCollection(resourceIterator, modelClass);
}

@Override
public TypeInformation<Customer> getReturnType() {
return TypeInformation.of(modelClass);
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
}

Thanks,

Josh

On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <[hidden email]> wrote:
Hi Josh,

Does your TableSource also implement ProjectableTableSource?
If yes, you need to make sure that the filter information is also forwarded if ProjectableTableSource.projectFields() is called after FilterableTableSource.applyPredicate().
Also make sure to correctly implement FilterableTableSource.isFilterPushedDown().

Hope this helps,
Fabian

Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <[hidden email]>:
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that retrieves data from a REST API and returns it as POJO instances. I've implemented FilterableTableSource as described in the docs, returning a new instance of my table source containing the predicates that I've removed from the list of predicates passed into applyPredicate. However, when getDataSet is eventually called, it's called on the instance of the table source that was originally registered with the table environment, which does not have any filters in it. I've stepped through the code in a debugger, and applyPredicates is definitely being called, and it's definitely returning new instances of my table source, but they don't seem to be being used.

I also played with the OrcTableSource, which is the only example of a push-down filter implementation I could find, and it doesn't behave this way. When I set a breakpoint in getDataSet in that case, it's being called on one of the new instances of the table source that contains the accepted filters.

Are there any other requirements for implementing push-down filters that aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo


--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo
Reply | Threaded
Open this post in threaded view
|

Re: Filter push-down not working for a custom BatchTableSource

Fabian Hueske-2
Hi Josh,

The code looks good to me.
This seems to be a bug then.
It's strange that it works for ORC.

Would you mind opening a Jira ticket and maybe a simple reproducable code example?

Thank you,
Fabian

Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <[hidden email]>:
Hi Fabian,

Thanks for your reply. My custom table source does not implement ProjectableTableSource. I believe that isFilterPushedDown is implemented correctly since it's nearly identical to what's written in the OrcTableSource. I pasted a slightly simplified version of the implementation below. If you wouldn't mind reading over it, is there anything obviously wrong?

public final class CustomerTableSource implements BatchTableSource<Customer>,
FilterableTableSource<Customer> {

// Iterator that gets data from a REST API as POJO instances
private final AppResourceIterator<Customer> resourceIterator;
private final String tableName;
private final Class<Customer> modelClass;
private final AppRequestFilter[] filters;

public CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass) {

this(resourceIterator, tableName, modelClass, null);
}

protected CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass,
AppRequestFilter[] filters) {

this.resourceIterator = resourceIterator;
this.tableName = tableName;
this.modelClass = modelClass;
this.filters = filters;
}

@Override
public TableSource<Customer> applyPredicate(List<Expression> predicates) {
List<Expression> acceptedPredicates = new ArrayList<>();
List<AppRequestFilter> acceptedFilters = new ArrayList<>();

for (final Expression predicate : predicates) {
buildFilterForPredicate(predicate).ifPresent(filter -> {
acceptedFilters.add(filter);
acceptedPredicates.add(predicate);
});
}

predicates.removeAll(acceptedPredicates);

return new CustomerTableSource(
resourceIterator.withFilters(acceptedFilters),
tableName,
modelClass,
acceptedFilters.toArray(new AppRequestFilter[0])
);
}

public Optional<AppRequestFilter> buildFilterForPredicate(Expression predicate) {
// Code for translating an Expression into an AppRequestFilter
// Returns Optional.empty() for predicates we don't want to / can't apply
}

@Override
public boolean isFilterPushedDown() {
return filters != null;
}

@Override
public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
return execEnv.fromCollection(resourceIterator, modelClass);
}

@Override
public TypeInformation<Customer> getReturnType() {
return TypeInformation.of(modelClass);
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
}

Thanks,

Josh

On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <[hidden email]> wrote:
Hi Josh,

Does your TableSource also implement ProjectableTableSource?
If yes, you need to make sure that the filter information is also forwarded if ProjectableTableSource.projectFields() is called after FilterableTableSource.applyPredicate().
Also make sure to correctly implement FilterableTableSource.isFilterPushedDown().

Hope this helps,
Fabian

Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <[hidden email]>:
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that retrieves data from a REST API and returns it as POJO instances. I've implemented FilterableTableSource as described in the docs, returning a new instance of my table source containing the predicates that I've removed from the list of predicates passed into applyPredicate. However, when getDataSet is eventually called, it's called on the instance of the table source that was originally registered with the table environment, which does not have any filters in it. I've stepped through the code in a debugger, and applyPredicates is definitely being called, and it's definitely returning new instances of my table source, but they don't seem to be being used.

I also played with the OrcTableSource, which is the only example of a push-down filter implementation I could find, and it doesn't behave this way. When I set a breakpoint in getDataSet in that case, it's being called on one of the new instances of the table source that contains the accepted filters.

Are there any other requirements for implementing push-down filters that aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo


--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo
Reply | Threaded
Open this post in threaded view
|

Re: Filter push-down not working for a custom BatchTableSource

Josh Bradt
Hi Fabian,

Thanks for taking a look. I've filed this ticket: https://issues.apache.org/jira/browse/FLINK-12399

Thanks,

Josh

On Fri, May 3, 2019 at 3:41 AM Fabian Hueske <[hidden email]> wrote:
Hi Josh,

The code looks good to me.
This seems to be a bug then.
It's strange that it works for ORC.

Would you mind opening a Jira ticket and maybe a simple reproducable code example?

Thank you,
Fabian

Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <[hidden email]>:
Hi Fabian,

Thanks for your reply. My custom table source does not implement ProjectableTableSource. I believe that isFilterPushedDown is implemented correctly since it's nearly identical to what's written in the OrcTableSource. I pasted a slightly simplified version of the implementation below. If you wouldn't mind reading over it, is there anything obviously wrong?

public final class CustomerTableSource implements BatchTableSource<Customer>,
FilterableTableSource<Customer> {

// Iterator that gets data from a REST API as POJO instances
private final AppResourceIterator<Customer> resourceIterator;
private final String tableName;
private final Class<Customer> modelClass;
private final AppRequestFilter[] filters;

public CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass) {

this(resourceIterator, tableName, modelClass, null);
}

protected CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass,
AppRequestFilter[] filters) {

this.resourceIterator = resourceIterator;
this.tableName = tableName;
this.modelClass = modelClass;
this.filters = filters;
}

@Override
public TableSource<Customer> applyPredicate(List<Expression> predicates) {
List<Expression> acceptedPredicates = new ArrayList<>();
List<AppRequestFilter> acceptedFilters = new ArrayList<>();

for (final Expression predicate : predicates) {
buildFilterForPredicate(predicate).ifPresent(filter -> {
acceptedFilters.add(filter);
acceptedPredicates.add(predicate);
});
}

predicates.removeAll(acceptedPredicates);

return new CustomerTableSource(
resourceIterator.withFilters(acceptedFilters),
tableName,
modelClass,
acceptedFilters.toArray(new AppRequestFilter[0])
);
}

public Optional<AppRequestFilter> buildFilterForPredicate(Expression predicate) {
// Code for translating an Expression into an AppRequestFilter
// Returns Optional.empty() for predicates we don't want to / can't apply
}

@Override
public boolean isFilterPushedDown() {
return filters != null;
}

@Override
public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
return execEnv.fromCollection(resourceIterator, modelClass);
}

@Override
public TypeInformation<Customer> getReturnType() {
return TypeInformation.of(modelClass);
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
}

Thanks,

Josh

On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <[hidden email]> wrote:
Hi Josh,

Does your TableSource also implement ProjectableTableSource?
If yes, you need to make sure that the filter information is also forwarded if ProjectableTableSource.projectFields() is called after FilterableTableSource.applyPredicate().
Also make sure to correctly implement FilterableTableSource.isFilterPushedDown().

Hope this helps,
Fabian

Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <[hidden email]>:
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that retrieves data from a REST API and returns it as POJO instances. I've implemented FilterableTableSource as described in the docs, returning a new instance of my table source containing the predicates that I've removed from the list of predicates passed into applyPredicate. However, when getDataSet is eventually called, it's called on the instance of the table source that was originally registered with the table environment, which does not have any filters in it. I've stepped through the code in a debugger, and applyPredicates is definitely being called, and it's definitely returning new instances of my table source, but they don't seem to be being used.

I also played with the OrcTableSource, which is the only example of a push-down filter implementation I could find, and it doesn't behave this way. When I set a breakpoint in getDataSet in that case, it's being called on one of the new instances of the table source that contains the accepted filters.

Are there any other requirements for implementing push-down filters that aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo


--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo


--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo
Reply | Threaded
Open this post in threaded view
|

Re: Filter push-down not working for a custom BatchTableSource

Rong Rong
Hi Josh,

I think I found the root cause of this issue (please see my comment in https://issues.apache.org/jira/browse/FLINK-12399). 
As of now, you can try override the expalinSource() interface to let calcite know that the tablesource after calling applyPredicate is different from the one before calling the function.

Let me know if this works for you :-)

Thanks,
Rong

On Fri, May 3, 2019 at 1:03 PM Josh Bradt <[hidden email]> wrote:
Hi Fabian,

Thanks for taking a look. I've filed this ticket: https://issues.apache.org/jira/browse/FLINK-12399

Thanks,

Josh

On Fri, May 3, 2019 at 3:41 AM Fabian Hueske <[hidden email]> wrote:
Hi Josh,

The code looks good to me.
This seems to be a bug then.
It's strange that it works for ORC.

Would you mind opening a Jira ticket and maybe a simple reproducable code example?

Thank you,
Fabian

Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <[hidden email]>:
Hi Fabian,

Thanks for your reply. My custom table source does not implement ProjectableTableSource. I believe that isFilterPushedDown is implemented correctly since it's nearly identical to what's written in the OrcTableSource. I pasted a slightly simplified version of the implementation below. If you wouldn't mind reading over it, is there anything obviously wrong?

public final class CustomerTableSource implements BatchTableSource<Customer>,
FilterableTableSource<Customer> {

// Iterator that gets data from a REST API as POJO instances
private final AppResourceIterator<Customer> resourceIterator;
private final String tableName;
private final Class<Customer> modelClass;
private final AppRequestFilter[] filters;

public CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass) {

this(resourceIterator, tableName, modelClass, null);
}

protected CustomerTableSource(
AppResourceIterator<Customer> resourceIterator,
String tableName,
Class<Customer> modelClass,
AppRequestFilter[] filters) {

this.resourceIterator = resourceIterator;
this.tableName = tableName;
this.modelClass = modelClass;
this.filters = filters;
}

@Override
public TableSource<Customer> applyPredicate(List<Expression> predicates) {
List<Expression> acceptedPredicates = new ArrayList<>();
List<AppRequestFilter> acceptedFilters = new ArrayList<>();

for (final Expression predicate : predicates) {
buildFilterForPredicate(predicate).ifPresent(filter -> {
acceptedFilters.add(filter);
acceptedPredicates.add(predicate);
});
}

predicates.removeAll(acceptedPredicates);

return new CustomerTableSource(
resourceIterator.withFilters(acceptedFilters),
tableName,
modelClass,
acceptedFilters.toArray(new AppRequestFilter[0])
);
}

public Optional<AppRequestFilter> buildFilterForPredicate(Expression predicate) {
// Code for translating an Expression into an AppRequestFilter
// Returns Optional.empty() for predicates we don't want to / can't apply
}

@Override
public boolean isFilterPushedDown() {
return filters != null;
}

@Override
public DataSet<Customer> getDataSet(ExecutionEnvironment execEnv) {
return execEnv.fromCollection(resourceIterator, modelClass);
}

@Override
public TypeInformation<Customer> getReturnType() {
return TypeInformation.of(modelClass);
}

@Override
public TableSchema getTableSchema() {
return TableSchema.fromTypeInfo(getReturnType());
}
}

Thanks,

Josh

On Thu, May 2, 2019 at 3:42 AM Fabian Hueske <[hidden email]> wrote:
Hi Josh,

Does your TableSource also implement ProjectableTableSource?
If yes, you need to make sure that the filter information is also forwarded if ProjectableTableSource.projectFields() is called after FilterableTableSource.applyPredicate().
Also make sure to correctly implement FilterableTableSource.isFilterPushedDown().

Hope this helps,
Fabian

Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <[hidden email]>:
Hi all,

I'm trying to implement filter push-down on a custom BatchTableSource that retrieves data from a REST API and returns it as POJO instances. I've implemented FilterableTableSource as described in the docs, returning a new instance of my table source containing the predicates that I've removed from the list of predicates passed into applyPredicate. However, when getDataSet is eventually called, it's called on the instance of the table source that was originally registered with the table environment, which does not have any filters in it. I've stepped through the code in a debugger, and applyPredicates is definitely being called, and it's definitely returning new instances of my table source, but they don't seem to be being used.

I also played with the OrcTableSource, which is the only example of a push-down filter implementation I could find, and it doesn't behave this way. When I set a breakpoint in getDataSet in that case, it's being called on one of the new instances of the table source that contains the accepted filters.

Are there any other requirements for implementing push-down filters that aren't listed in the docs? Or does anyone have any tips for this?

Thanks,

Josh

--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo


--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo


--
Josh Bradt
Software Engineer
225 Franklin St, Boston, MA 02110
klaviyo.com
Klaviyo Logo