ModuleNotFound when loading udf from another py file

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

ModuleNotFound when loading udf from another py file

Yik San Chan
Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San
Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Dian Fu
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Yik San Chan
Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607. I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San

Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Dian Fu
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.

Regards,
Dian

2021年4月27日 下午8:52,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607. I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Yik San Chan
Hi Dian,

Wow, this is unexpected 😮 How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe:

To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs live outside of the file where the main() function is defined.

What do you think?

Best,
Yik San

On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <[hidden email]> wrote:
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.

Regards,
Dian

2021年4月27日 下午8:52,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607. I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San


Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Dian Fu
Hi Yik San,

Make sense to me. :)

Regards,
Dian

2021年4月27日 下午9:50,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Wow, this is unexpected 😮 How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe:

To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs live outside of the file where the main() function is defined.

What do you think?

Best,
Yik San

On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <[hidden email]> wrote:
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.

Regards,
Dian

2021年4月27日 下午8:52,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607. I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San



Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Yik San Chan
Hi Dian,


On Tue, Apr 27, 2021 at 11:03 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Make sense to me. :)

Regards,
Dian

2021年4月27日 下午9:50,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Wow, this is unexpected 😮 How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe:

To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs live outside of the file where the main() function is defined.

What do you think?

Best,
Yik San

On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <[hidden email]> wrote:
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.

Regards,
Dian

2021年4月27日 下午8:52,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607. I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San



Reply | Threaded
Open this post in threaded view
|

Re: ModuleNotFound when loading udf from another py file

Dian Fu
Thanks a lot~

2021年4月28日 上午8:25,Yik San Chan <[hidden email]> 写道:

Hi Dian,


On Tue, Apr 27, 2021 at 11:03 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

Make sense to me. :)

Regards,
Dian

2021年4月27日 下午9:50,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Wow, this is unexpected 😮 How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe:

To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs live outside of the file where the main() function is defined.

What do you think?

Best,
Yik San

On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <[hidden email]> wrote:
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.

Regards,
Dian

2021年4月27日 下午8:52,Yik San Chan <[hidden email]> 写道:

Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607. I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <[hidden email]> wrote:
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

2021年4月27日 下午8:01,Yik San Chan <[hidden email]> 写道:

Hi,


I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
self.data_channel_factory)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
result = cache[args] = func(*args)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
transform_id, transform_consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.

Thank you!

Best,
Yik San