Open Method is not being called in case of AggregateFunction UDFs

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Open Method is not being called in case of AggregateFunction UDFs

Arujit Pradhan
Hi all,

So we are creating some User Defined Functions of type AggregateFunction. And we want to send some static metrics from the open() method of the UDFs as we can get MetricGroup by FunctionContext which is only exposed in the open method. Our code looks something like this(Which is an implementation of count distinct in SQL) :

public class DistinctCount extends AggregateFunction<Integer, DistinctCountAccumulator> {
@Override
public DistinctCountAccumulator createAccumulator() {
return new DistinctCountAccumulator();
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
MetricGroup metricGroup = context.getMetricGroup();
// add some metric to the group here
System.out.println("in the open of UDF");
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public Integer getValue(DistinctCountAccumulator distinctCountAccumulator) {
System.out.println("in the udf");
return distinctCountAccumulator.count();
}

public void accumulate(DistinctCountAccumulator distinctCountAccumulator, String item) {
if (item == null) {
return;
}
distinctCountAccumulator.add(item);
}
}

But when we use this UDF in FlinkSQL, it seems like the open method is not being called at all.

From the filnk UDF documentation we find :

The open() method is called once before the evaluation method. The close() method after the last call to the evaluation method.

The open() method provides a FunctionContext that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.

Then is there any reason that open is not working in AggragateFunctions. Btw it works fine in case of ScalarFunctions. Is there any alternative scope where we can register some static metrics in a UDF.


Thanks and regards,
Arujit

Reply | Threaded
Open this post in threaded view
|

Re: Open Method is not being called in case of AggregateFunction UDFs

Dawid Wysakowicz-2

Hi Arujit,

Could you also share the query where you use this UDF? It would also help if you said which version of Flink you are using and which planner.

Best,

Dawid

On 11/12/2019 10:21, Arujit Pradhan wrote:
Hi all,

So we are creating some User Defined Functions of type AggregateFunction. And we want to send some static metrics from the open() method of the UDFs as we can get MetricGroup by FunctionContext which is only exposed in the open method. Our code looks something like this(Which is an implementation of count distinct in SQL) :

public class DistinctCount extends AggregateFunction<Integer, DistinctCountAccumulator> {
    @Override
    public DistinctCountAccumulator createAccumulator() {
        return new DistinctCountAccumulator();
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        MetricGroup metricGroup = context.getMetricGroup();
        // add some metric to the group here
        System.out.println("in the open of UDF");
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public Integer getValue(DistinctCountAccumulator distinctCountAccumulator) {
        System.out.println("in the udf");
        return distinctCountAccumulator.count();
    }

    public void accumulate(DistinctCountAccumulator distinctCountAccumulator, String item) {
        if (item == null) {
            return;
        }
        distinctCountAccumulator.add(item);
    }
}

But when we use this UDF in FlinkSQL, it seems like the open method is not being called at all.

From the filnk UDF documentation we find :

The open() method is called once before the evaluation method. The close() method after the last call to the evaluation method.

The open() method provides a FunctionContext that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.

Then is there any reason that open is not working in AggragateFunctions. Btw it works fine in case of ScalarFunctions. Is there any alternative scope where we can register some static metrics in a UDF.


Thanks and regards,
Arujit


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Open Method is not being called in case of AggregateFunction UDFs

Timo Walther
I remember that we fixed some bug around this topic recently. The legacy
planner should not be affected.

There is another user reporting this:
https://issues.apache.org/jira/browse/FLINK-15040

Regards,
Timo

On 11.12.19 10:34, Dawid Wysakowicz wrote:

> Hi Arujit,
>
> Could you also share the query where you use this UDF? It would also
> help if you said which version of Flink you are using and which planner.
>
> Best,
>
> Dawid
>
> On 11/12/2019 10:21, Arujit Pradhan wrote:
>> Hi all,
>>
>> So we are creating some User Defined Functions of type
>> AggregateFunction. And we want to send some static metrics from the
>> *open()* method of the UDFs as we can get *MetricGroup *by
>> *FunctionContext *which is only exposed in the open method. Our code
>> looks something like this(Which is an implementation of count distinct
>> in SQL) :
>>
>> public class DistinctCount extends AggregateFunction<Integer,
>> DistinctCountAccumulator> { @Override public DistinctCountAccumulator
>> createAccumulator() { return new DistinctCountAccumulator(); }
>> @Override public void open(FunctionContext context) throws Exception { super.open(context); MetricGroup metricGroup = context.getMetricGroup(); // add some metric to the group here
>> System.out.println("in the open of UDF"); } @Override public void
>> close() throws Exception { super.close(); } @Override public Integer
>> getValue(DistinctCountAccumulator distinctCountAccumulator) { System.out.println("in the udf"); return distinctCountAccumulator.count(); } public void accumulate(DistinctCountAccumulator distinctCountAccumulator, String item) { if (item== null) { return; } distinctCountAccumulator.add(item); } }
>>
>> But when we use this UDF in FlinkSQL, it seems like the open method is
>> not being called at all.
>>
>> From the filnk UDF documentation we find :
>>
>> *The |open()| method is called once before the evaluation method. The
>> |close()| method after the last call to the evaluation method.*
>>
>> *The |open()| method provides a |FunctionContext| that contains
>> information about the context in which user-defined functions are
>> executed, such as the metric group, the distributed cache files, or
>> the global job parameters.*
>>
>> Then is there any reason that open is not working in
>> AggragateFunctions. Btw it works fine in case of ScalarFunctions. Is
>> there any alternative scope where we can register some static metrics
>> in a UDF.
>>
>>
>> Thanks and regards,
>> /Arujit/
>>

Reply | Threaded
Open this post in threaded view
|

Re: Open Method is not being called in case of AggregateFunction UDFs

Timo Walther
At least I hope it has been fixed. Which version and planner are you using?


On 11.12.19 11:47, Arujit Pradhan wrote:

> Hi Timo,
>
> Thanks for the bug reference.
>
> You mentioned that this bug has been fixed. Is the fix available for
> flink 1.9+ and default query planner.
>
> Thanks and regards,
> /Arujit/
>
> On Wed, Dec 11, 2019 at 3:56 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I remember that we fixed some bug around this topic recently. The
>     legacy
>     planner should not be affected.
>
>     There is another user reporting this:
>     https://issues.apache.org/jira/browse/FLINK-15040
>
>     Regards,
>     Timo
>
>     On 11.12.19 10:34, Dawid Wysakowicz wrote:
>      > Hi Arujit,
>      >
>      > Could you also share the query where you use this UDF? It would also
>      > help if you said which version of Flink you are using and which
>     planner.
>      >
>      > Best,
>      >
>      > Dawid
>      >
>      > On 11/12/2019 10:21, Arujit Pradhan wrote:
>      >> Hi all,
>      >>
>      >> So we are creating some User Defined Functions of type
>      >> AggregateFunction. And we want to send some static metrics from the
>      >> *open()* method of the UDFs as we can get *MetricGroup *by
>      >> *FunctionContext *which is only exposed in the open method. Our
>     code
>      >> looks something like this(Which is an implementation of count
>     distinct
>      >> in SQL) :
>      >>
>      >> public class DistinctCount extends AggregateFunction<Integer,
>      >> DistinctCountAccumulator> { @Override public
>     DistinctCountAccumulator
>      >> createAccumulator() { return new DistinctCountAccumulator(); }
>      >> @Override public void open(FunctionContext context) throws
>     Exception { super.open(context); MetricGroup metricGroup =
>     context.getMetricGroup(); // add some metric to the group here
>      >> System.out.println("in the open of UDF"); } @Override public void
>      >> close() throws Exception { super.close(); } @Override public
>     Integer
>      >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
>     System.out.println("in the udf"); return
>     distinctCountAccumulator.count(); } public void
>     accumulate(DistinctCountAccumulator distinctCountAccumulator, String
>     item) { if (item== null) { return; }
>     distinctCountAccumulator.add(item); } }
>      >>
>      >> But when we use this UDF in FlinkSQL, it seems like the open
>     method is
>      >> not being called at all.
>      >>
>      >> From the filnk UDF documentation we find :
>      >>
>      >> *The |open()| method is called once before the evaluation
>     method. The
>      >> |close()| method after the last call to the evaluation method.*
>      >>
>      >> *The |open()| method provides a |FunctionContext| that contains
>      >> information about the context in which user-defined functions are
>      >> executed, such as the metric group, the distributed cache files, or
>      >> the global job parameters.*
>      >>
>      >> Then is there any reason that open is not working in
>      >> AggragateFunctions. Btw it works fine in case of
>     ScalarFunctions. Is
>      >> there any alternative scope where we can register some static
>     metrics
>      >> in a UDF.
>      >>
>      >>
>      >> Thanks and regards,
>      >> /Arujit/
>      >>
>

Reply | Threaded
Open this post in threaded view
|

Re: Open Method is not being called in case of AggregateFunction UDFs

Jark Wu-3
Hi Arujit,

Thanks for reporting this. Are you using this UDF in window aggregation in old planner ?
AFAIK, open() method of UDAF is only not called in window aggregation in old planner,
because old planner uses DataStream WindowOperator which will not call open() on AggregateFunction [1].

I also tested it in master branch, and it works for other aggregation (e.g. over aggs, group aggs) in old planner, 
and works for any aggregations in blink planner. 

If you are using v1.9,  you can switch to blink planner and have a try.

Best,
Jark


On Wed, 11 Dec 2019 at 19:01, Timo Walther <[hidden email]> wrote:
At least I hope it has been fixed. Which version and planner are you using?


On 11.12.19 11:47, Arujit Pradhan wrote:
> Hi Timo,
>
> Thanks for the bug reference.
>
> You mentioned that this bug has been fixed. Is the fix available for
> flink 1.9+ and default query planner.
>
> Thanks and regards,
> /Arujit/
>
> On Wed, Dec 11, 2019 at 3:56 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I remember that we fixed some bug around this topic recently. The
>     legacy
>     planner should not be affected.
>
>     There is another user reporting this:
>     https://issues.apache.org/jira/browse/FLINK-15040
>
>     Regards,
>     Timo
>
>     On 11.12.19 10:34, Dawid Wysakowicz wrote:
>      > Hi Arujit,
>      >
>      > Could you also share the query where you use this UDF? It would also
>      > help if you said which version of Flink you are using and which
>     planner.
>      >
>      > Best,
>      >
>      > Dawid
>      >
>      > On 11/12/2019 10:21, Arujit Pradhan wrote:
>      >> Hi all,
>      >>
>      >> So we are creating some User Defined Functions of type
>      >> AggregateFunction. And we want to send some static metrics from the
>      >> *open()* method of the UDFs as we can get *MetricGroup *by
>      >> *FunctionContext *which is only exposed in the open method. Our
>     code
>      >> looks something like this(Which is an implementation of count
>     distinct
>      >> in SQL) :
>      >>
>      >> public class DistinctCount extends AggregateFunction<Integer,
>      >> DistinctCountAccumulator> { @Override public
>     DistinctCountAccumulator
>      >> createAccumulator() { return new DistinctCountAccumulator(); }
>      >> @Override public void open(FunctionContext context) throws
>     Exception { super.open(context); MetricGroup metricGroup =
>     context.getMetricGroup(); // add some metric to the group here
>      >> System.out.println("in the open of UDF"); } @Override public void
>      >> close() throws Exception { super.close(); } @Override public
>     Integer
>      >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
>     System.out.println("in the udf"); return
>     distinctCountAccumulator.count(); } public void
>     accumulate(DistinctCountAccumulator distinctCountAccumulator, String
>     item) { if (item== null) { return; }
>     distinctCountAccumulator.add(item); } }
>      >>
>      >> But when we use this UDF in FlinkSQL, it seems like the open
>     method is
>      >> not being called at all.
>      >>
>      >> From the filnk UDF documentation we find :
>      >>
>      >> *The |open()| method is called once before the evaluation
>     method. The
>      >> |close()| method after the last call to the evaluation method.*
>      >>
>      >> *The |open()| method provides a |FunctionContext| that contains
>      >> information about the context in which user-defined functions are
>      >> executed, such as the metric group, the distributed cache files, or
>      >> the global job parameters.*
>      >>
>      >> Then is there any reason that open is not working in
>      >> AggragateFunctions. Btw it works fine in case of
>     ScalarFunctions. Is
>      >> there any alternative scope where we can register some static
>     metrics
>      >> in a UDF.
>      >>
>      >>
>      >> Thanks and regards,
>      >> /Arujit/
>      >>
>