PyFlink UDF: When to use vectorized vs scalar

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink UDF: When to use vectorized vs scalar

Yik San Chan
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Fabian Paul
Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Dian Fu
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Yik San Chan
Hi Fabian and Dian,

Thanks for the reply. They make sense.

Best,
Yik San

On Mon, Apr 19, 2021 at 9:49 AM Dian Fu <[hidden email]> wrote:
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Yik San Chan
Hmm one more question - as I said, there are 2 gains from using pandas UDF - (1) smaller ser-de and invocation overhead, and (2) vector calculation.

(2) depends on use cases, how about (1)? Is the benefit (1) always-true?

Best,
Yik San

On Mon, Apr 19, 2021 at 4:33 PM Yik San Chan <[hidden email]> wrote:
Hi Fabian and Dian,

Thanks for the reply. They make sense.

Best,
Yik San

On Mon, Apr 19, 2021 at 9:49 AM Dian Fu <[hidden email]> wrote:
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Dian Fu
I have not tested this and so I have no direct answer to this question. 

There are some tricky things behind this. For Pandas UDF, the input data will be organized as columnar format. however, if there are multiple input arguments for the Pandas UDF and you access data at row basis in the Pandas UDF implementation, then the cache locality may become a problem as you need to access the elements at position i for each of the columnar data structure when processing the ith row.

Regards,
Dian

2021年4月19日 下午4:40,Yik San Chan <[hidden email]> 写道:

Hmm one more question - as I said, there are 2 gains from using pandas UDF - (1) smaller ser-de and invocation overhead, and (2) vector calculation.

(2) depends on use cases, how about (1)? Is the benefit (1) always-true?

Best,
Yik San

On Mon, Apr 19, 2021 at 4:33 PM Yik San Chan <[hidden email]> wrote:
Hi Fabian and Dian,

Thanks for the reply. They make sense.

Best,
Yik San

On Mon, Apr 19, 2021 at 9:49 AM Dian Fu <[hidden email]> wrote:
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San



Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Yik San Chan
Hi Dian,

By "access data at row basis", do you mean, for input X,

for row in X:
    doSomething(row)

If that's the case, I believe I am not accessing the vector like that. What I do is pretty much, for input X1, X2 and X3:

model = ...
predictions = model.predict(X1, X2, X3)

Do I understand it correctly?

Best,
Yik San 

On Mon, Apr 19, 2021 at 7:45 PM Dian Fu <[hidden email]> wrote:
I have not tested this and so I have no direct answer to this question. 

There are some tricky things behind this. For Pandas UDF, the input data will be organized as columnar format. however, if there are multiple input arguments for the Pandas UDF and you access data at row basis in the Pandas UDF implementation, then the cache locality may become a problem as you need to access the elements at position i for each of the columnar data structure when processing the ith row.

Regards,
Dian

2021年4月19日 下午4:40,Yik San Chan <[hidden email]> 写道:

Hmm one more question - as I said, there are 2 gains from using pandas UDF - (1) smaller ser-de and invocation overhead, and (2) vector calculation.

(2) depends on use cases, how about (1)? Is the benefit (1) always-true?

Best,
Yik San

On Mon, Apr 19, 2021 at 4:33 PM Yik San Chan <[hidden email]> wrote:
Hi Fabian and Dian,

Thanks for the reply. They make sense.

Best,
Yik San

On Mon, Apr 19, 2021 at 9:49 AM Dian Fu <[hidden email]> wrote:
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San



Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Dian Fu
Yes, your understanding is correct.

So model.predict accept pandas.Series as inputs? If this is the case, then I guess Pandas UDF is a perfect choice for your requirements.

Regards,
Dian

2021年4月19日 下午8:23,Yik San Chan <[hidden email]> 写道:

Hi Dian,

By "access data at row basis", do you mean, for input X,

for row in X:
    doSomething(row)

If that's the case, I believe I am not accessing the vector like that. What I do is pretty much, for input X1, X2 and X3:

model = ...
predictions = model.predict(X1, X2, X3)

Do I understand it correctly?

Best,
Yik San 

On Mon, Apr 19, 2021 at 7:45 PM Dian Fu <[hidden email]> wrote:
I have not tested this and so I have no direct answer to this question. 

There are some tricky things behind this. For Pandas UDF, the input data will be organized as columnar format. however, if there are multiple input arguments for the Pandas UDF and you access data at row basis in the Pandas UDF implementation, then the cache locality may become a problem as you need to access the elements at position i for each of the columnar data structure when processing the ith row.

Regards,
Dian

2021年4月19日 下午4:40,Yik San Chan <[hidden email]> 写道:

Hmm one more question - as I said, there are 2 gains from using pandas UDF - (1) smaller ser-de and invocation overhead, and (2) vector calculation.

(2) depends on use cases, how about (1)? Is the benefit (1) always-true?

Best,
Yik San

On Mon, Apr 19, 2021 at 4:33 PM Yik San Chan <[hidden email]> wrote:
Hi Fabian and Dian,

Thanks for the reply. They make sense.

Best,
Yik San

On Mon, Apr 19, 2021 at 9:49 AM Dian Fu <[hidden email]> wrote:
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San




Reply | Threaded
Open this post in threaded view
|

Re: PyFlink UDF: When to use vectorized vs scalar

Yik San Chan
Hi Dian,

Thank you!

Best,
Yik San

On Tue, Apr 20, 2021 at 9:24 AM Dian Fu <[hidden email]> wrote:
Yes, your understanding is correct.

So model.predict accept pandas.Series as inputs? If this is the case, then I guess Pandas UDF is a perfect choice for your requirements.

Regards,
Dian

2021年4月19日 下午8:23,Yik San Chan <[hidden email]> 写道:

Hi Dian,

By "access data at row basis", do you mean, for input X,

for row in X:
    doSomething(row)

If that's the case, I believe I am not accessing the vector like that. What I do is pretty much, for input X1, X2 and X3:

model = ...
predictions = model.predict(X1, X2, X3)

Do I understand it correctly?

Best,
Yik San 

On Mon, Apr 19, 2021 at 7:45 PM Dian Fu <[hidden email]> wrote:
I have not tested this and so I have no direct answer to this question. 

There are some tricky things behind this. For Pandas UDF, the input data will be organized as columnar format. however, if there are multiple input arguments for the Pandas UDF and you access data at row basis in the Pandas UDF implementation, then the cache locality may become a problem as you need to access the elements at position i for each of the columnar data structure when processing the ith row.

Regards,
Dian

2021年4月19日 下午4:40,Yik San Chan <[hidden email]> 写道:

Hmm one more question - as I said, there are 2 gains from using pandas UDF - (1) smaller ser-de and invocation overhead, and (2) vector calculation.

(2) depends on use cases, how about (1)? Is the benefit (1) always-true?

Best,
Yik San

On Mon, Apr 19, 2021 at 4:33 PM Yik San Chan <[hidden email]> wrote:
Hi Fabian and Dian,

Thanks for the reply. They make sense.

Best,
Yik San

On Mon, Apr 19, 2021 at 9:49 AM Dian Fu <[hidden email]> wrote:
Hi Yik San,

It much depends on what you want to do in your Python UDF implementation. As you know that, for vectorized Python UDF (aka. Pandas UDF), the input data are organized as columnar format. So if your Python UDF implementation could benefit from this, e.g. making use of the functionalities provided in the libraries such as Pandas, Numpy, etc which are columnar oriented, then vectorized Python UDF is usually a better choice. However, if you have to operate the input data one row at a time, then I guess that the non-vectorized Python UDF is enough. 

PS: you could also run some performance test when it’s unclear which one is better.

Regards,
Dian

2021年4月16日 下午8:24,Fabian Paul <[hidden email]> 写道:

Hi Yik San,

I think the usage of vectorized udfs highly depends on your input and output formats. For your example my first impression would say that parsing a JSON string is always an rather expensive operation and the vectorization has not much impact on that. 

I am ccing Dian Fu who is more familiar with pyflink

Best,
Fabian

On 16. Apr 2021, at 11:04, Yik San Chan <[hidden email]> wrote:

The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized vs scalar PyFlink UDF?

According to [docs](https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html), vectorized UDF has advantages of: (1) smaller ser-de and invocation overhead (2) Vector calculation are highly optimized thanks to libs such as Numpy.

> Vectorized Python user-defined functions are functions which are executed by transferring a batch of elements between JVM and Python VM in Arrow columnar format. The performance of vectorized Python user-defined functions are usually much higher than non-vectorized Python user-defined functions as the serialization/deserialization overhead and invocation overhead are much reduced. Besides, users could leverage the popular Python libraries such as Pandas, Numpy, etc for the vectorized Python user-defined functions implementation. These Python libraries are highly optimized and provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON column, that is not supported by Flink [built-in functions](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html) yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
    import json
    return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San