How to load resource in a PyFlink UDF

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to load resource in a PyFlink UDF

Yik San Chan
Hi,

My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71.

Thanks!

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: How to load resource in a PyFlink UDF

Dian Fu
Hi Yik San,

Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?

Regards,
Dian

2021年4月27日 下午4:39,Yik San Chan <[hidden email]> 写道:

Hi,

My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71.

Thanks!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: How to load resource in a PyFlink UDF

Yik San Chan
Hi Dian,

Thank you! That solves my question. By the way, for my use case, does -pyarch make more sense than -pyfs?

Best,
Yik San

On Tue, Apr 27, 2021 at 4:52 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?

Regards,
Dian

2021年4月27日 下午4:39,Yik San Chan <[hidden email]> 写道:

Hi,

My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71.

Thanks!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: How to load resource in a PyFlink UDF

Dian Fu
Hi Yik San,

Command line option `-pyarch` could be used to specify archive files such as Python virtual environment, ML model, data file, etc.

So for resources.zip, -pyarch makes more sense than -pyfs.

Regards,
Dian

2021年4月27日 下午5:14,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thank you! That solves my question. By the way, for my use case, does -pyarch make more sense than -pyfs?

Best,
Yik San

On Tue, Apr 27, 2021 at 4:52 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?

Regards,
Dian

2021年4月27日 下午4:39,Yik San Chan <[hidden email]> 写道:

Hi,

My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71.

Thanks!

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: How to load resource in a PyFlink UDF

Yik San Chan
Hi Dian,

Thank you for the detailed answer!

Best,
Yik San

On Tue, Apr 27, 2021 at 5:42 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Command line option `-pyarch` could be used to specify archive files such as Python virtual environment, ML model, data file, etc.

So for resources.zip, -pyarch makes more sense than -pyfs.

Regards,
Dian

2021年4月27日 下午5:14,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thank you! That solves my question. By the way, for my use case, does -pyarch make more sense than -pyfs?

Best,
Yik San

On Tue, Apr 27, 2021 at 4:52 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?

Regards,
Dian

2021年4月27日 下午4:39,Yik San Chan <[hidden email]> 写道:

Hi,

My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71.

Thanks!

Best,
Yik San