I call Pandas UDF N times, do I have to initiate the UDF N times?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

I call Pandas UDF N times, do I have to initiate the UDF N times?

Yik San Chan
Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
    logging.info("LabelEncode.open")
    self.encoder = load_encoder()
  def eval(self, x):
    ...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: I call Pandas UDF N times, do I have to initiate the UDF N times?

Dian Fu
Hi Yik San,

Is it acceptable to rewrite the UDF a bit to accept multiple parameters and then rewrite the program as following:

```
SELECT
LABEL_ENCODE(a, b, c)
...
```

Regards,
Dian

2021年5月8日 上午11:56,Yik San Chan <[hidden email]> 写道:

Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
    logging.info("LabelEncode.open")
    self.encoder = load_encoder()
  def eval(self, x):
    ...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: I call Pandas UDF N times, do I have to initiate the UDF N times?

Yik San Chan
Hi Dian,

Thanks for pointing that out, it is a work around that I have also considered.

I wonder if there is a framework level optimization on this, so that a UDF is only initiated once, no matter how many times it is called?

Thank you!

Best,
Yik San

On Sat, May 8, 2021 at 1:32 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Is it acceptable to rewrite the UDF a bit to accept multiple parameters and then rewrite the program as following:

```
SELECT
LABEL_ENCODE(a, b, c)
...
```

Regards,
Dian

2021年5月8日 上午11:56,Yik San Chan <[hidden email]> 写道:

Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
    logging.info("LabelEncode.open")
    self.encoder = load_encoder()
  def eval(self, x):
    ...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: I call Pandas UDF N times, do I have to initiate the UDF N times?

Dian Fu
There is still no such optimization at framework level. However, I think this maybe a good point that we could optimize. Would you like to create a ticket for this?

Regards,
Dian

2021年5月8日 下午2:27,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks for pointing that out, it is a work around that I have also considered.

I wonder if there is a framework level optimization on this, so that a UDF is only initiated once, no matter how many times it is called?

Thank you!

Best,
Yik San

On Sat, May 8, 2021 at 1:32 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Is it acceptable to rewrite the UDF a bit to accept multiple parameters and then rewrite the program as following:

```
SELECT
LABEL_ENCODE(a, b, c)
...
```

Regards,
Dian

2021年5月8日 上午11:56,Yik San Chan <[hidden email]> 写道:

Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
    logging.info("LabelEncode.open")
    self.encoder = load_encoder()
  def eval(self, x):
    ...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: I call Pandas UDF N times, do I have to initiate the UDF N times?

Yik San Chan
Hi Dian,

Thanks for the confirmation, I have created a ticket https://issues.apache.org/jira/browse/FLINK-22605

Best,
Yik San

On Sat, May 8, 2021 at 2:32 PM Dian Fu <[hidden email]> wrote:
There is still no such optimization at framework level. However, I think this maybe a good point that we could optimize. Would you like to create a ticket for this?

Regards,
Dian

2021年5月8日 下午2:27,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks for pointing that out, it is a work around that I have also considered.

I wonder if there is a framework level optimization on this, so that a UDF is only initiated once, no matter how many times it is called?

Thank you!

Best,
Yik San

On Sat, May 8, 2021 at 1:32 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Is it acceptable to rewrite the UDF a bit to accept multiple parameters and then rewrite the program as following:

```
SELECT
LABEL_ENCODE(a, b, c)
...
```

Regards,
Dian

2021年5月8日 上午11:56,Yik San Chan <[hidden email]> 写道:

Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
    logging.info("LabelEncode.open")
    self.encoder = load_encoder()
  def eval(self, x):
    ...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: I call Pandas UDF N times, do I have to initiate the UDF N times?

Dian Fu
Thanks a lot~

2021年5月8日 下午2:41,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks for the confirmation, I have created a ticket https://issues.apache.org/jira/browse/FLINK-22605

Best,
Yik San

On Sat, May 8, 2021 at 2:32 PM Dian Fu <[hidden email]> wrote:
There is still no such optimization at framework level. However, I think this maybe a good point that we could optimize. Would you like to create a ticket for this?

Regards,
Dian

2021年5月8日 下午2:27,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks for pointing that out, it is a work around that I have also considered.

I wonder if there is a framework level optimization on this, so that a UDF is only initiated once, no matter how many times it is called?

Thank you!

Best,
Yik San

On Sat, May 8, 2021 at 1:32 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Is it acceptable to rewrite the UDF a bit to accept multiple parameters and then rewrite the program as following:

```
SELECT
LABEL_ENCODE(a, b, c)
...
```

Regards,
Dian

2021年5月8日 上午11:56,Yik San Chan <[hidden email]> 写道:

Hi community,

I am using PyFlink and Pandas UDF in my job.

The job executes a SQL like this:

```
SELECT
LABEL_ENCODE(a),
LABEL_ENCODE(b),
LABEL_ENCODE(c)
...
```

And my LABEL_ENCODE UDF is defined below:

```
class LabelEncode(ScalarFunction):
  def open(self, function_context):
    logging.info("LabelEncode.open")
    self.encoder = load_encoder()
  def eval(self, x):
    ...

labelEncode = udf(LabelEncode(), ...)
```

When I run the job, according to taskmanger log, "LabelEncode.open" is printed 3 times, which is exactly the times LABEL_ENCODE udf is called.

Since every LabelEncode.open causes an I/O (load_encoder() does so), I wonder if I can only initiate the UDF once, and use it 3 times?

Thank you!

Best,
Yik San