Hi community,
I run into this odd issue when running this locally: ``` ~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/pyflink-quickstart/bin/python -pyarch resources.zip -pyfs decrypt_fun.py -py udf_use_resource.py ``` In TaskManager Logs, it says: ``` 2021-04-28 20:32:46,332 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (830a34b910582c535c25b1733630a801) switched from RUNNING to FAILED. java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_282] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:128) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:206) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282] at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:206) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-d813143531d67e63b3fce5bcc6de6380:1.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282] ... 1 more 2021-04-28 20:32:46,338 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (830a34b910582c535c25b1733630a801). 2021-04-28 20:32:46,352 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 830a34b910582c535c25b1733630a801. 2021-04-28 20:32:46,454 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 7bb923feffb3eeb4064c3dcf97053810, jobId: e3fda4e1369f30007073b02dcc72d0cc). 2021-04-28 20:32:46,457 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job e3fda4e1369f30007073b02dcc72d0cc from job leader monitoring. 2021-04-28 20:32:46,458 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job e3fda4e1369f30007073b02dcc72d0cc. 2021-04-28 20:33:21,625 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 67c3bb602e3b4c9afdba15e1b9b1a998 for job aa5b81f2862b5ef4da119cf0fc1d8de0 from resource manager with leader id 00000000000000000000000000000000. 2021-04-28 20:33:21,626 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 67c3bb602e3b4c9afdba15e1b9b1a998. 2021-04-28 20:33:21,626 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job aa5b81f2862b5ef4da119cf0fc1d8de0 for job leader monitoring. 2021-04-28 20:33:21,626 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_3 with leader id 00000000-0000-0000-0000-000000000000. 2021-04-28 20:33:21,634 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2021-04-28 20:33:21,647 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_3 for job aa5b81f2862b5ef4da119cf0fc1d8de0. 2021-04-28 20:33:21,647 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job aa5b81f2862b5ef4da119cf0fc1d8de0. 2021-04-28 20:33:21,648 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job aa5b81f2862b5ef4da119cf0fc1d8de0. 2021-04-28 20:33:21,656 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 67c3bb602e3b4c9afdba15e1b9b1a998. 2021-04-28 20:33:21,657 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c), deploy into slot with allocation id 67c3bb602e3b4c9afdba15e1b9b1a998. 2021-04-28 20:33:21,658 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 67c3bb602e3b4c9afdba15e1b9b1a998. 2021-04-28 20:33:21,658 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c) switched from CREATED to DEPLOYING. 2021-04-28 20:33:21,658 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c) [DEPLOYING]. 2021-04-28 20:33:21,659 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading aa5b81f2862b5ef4da119cf0fc1d8de0/p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585 from localhost/127.0.0.1:60691 2021-04-28 20:33:21,877 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c) [DEPLOYING]. 2021-04-28 20:33:21,877 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'python_file_e30993dc0ed6e9ceb07477d5845a17f7f912d36a76073016dd68ee1bbb72c898'. 2021-04-28 20:33:21,877 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'python_archive_a641ab3c2118955827b6883737b6b56ada29b774ac4ebdc68fbe86954f408203'. 2021-04-28 20:33:21,878 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading aa5b81f2862b5ef4da119cf0fc1d8de0/p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-ec48f21df40ec0dca89972092c7efdfb from localhost/127.0.0.1:60691 2021-04-28 20:33:21,878 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2021-04-28 20:33:21,878 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading aa5b81f2862b5ef4da119cf0fc1d8de0/p-dc1b94442abf644a62b5107ccbfe4c10d1d1856e-400f692d090a8e3a062e374933d5fa96 from localhost/127.0.0.1:60691 2021-04-28 20:33:21,878 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c) switched from DEPLOYING to RUNNING. 2021-04-28 20:33:21,905 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) exceeded the 80 characters length limit and was truncated. 2021-04-28 20:33:21,906 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle size is configured to 100000. 2021-04-28 20:33:21,907 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle time is configured to 1000 milliseconds. 2021-04-28 20:33:21,952 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2021-04-28 20:33:21,952 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2021-04-28 20:33:21,952 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2021-04-28 20:33:21,952 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2021-04-28 20:33:21,952 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2021-04-28 20:33:21,952 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1 2021-04-28 20:33:21,953 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2021-04-28 20:33:22,543 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: /var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-f00bfb4e-2487-4402-b034-c09ce91f645b/python-files/blob_p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-ec48f21df40ec0dca89972092c7efdfb 2021-04-28 20:33:22,543 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python working dir of python worker: /var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-f00bfb4e-2487-4402-b034-c09ce91f645b/python-archives 2021-04-28 20:33:22,545 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python interpreter path: /usr/local/anaconda3/envs/featflow-ml-env/bin/python 2021-04-28 20:33:22,585 INFO org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner [] - Obtained shared Python process of size 536870920 bytes 2021-04-28 20:33:24,871 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2021-04-28 20:33:24,880 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:98 [] - Logging handler created. 2021-04-28 20:33:24,880 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:125 [] - semi_persistent_directory: /tmp 2021-04-28 20:33:24,881 WARN /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:240 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail. 2021-04-28 20:33:24,881 WARN /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/options/pipeline_options.py:309 [] - Discarding unparseable args: ['--options_id=0.0', '--app_name=BeamPythonFunctionRunner'] 2021-04-28 20:33:24,881 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:138 [] - Python sdk harness started with pipeline_options: {} 2021-04-28 20:33:24,881 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/statecache.py:154 [] - Creating state cache with size 0 2021-04-28 20:33:24,881 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:155 [] - Creating insecure control channel for localhost:60777. 2021-04-28 20:33:24,881 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:79 [] - Status HTTP server running at 1.0.0.127.in-addr.arpa:60792 2021-04-28 20:33:24,882 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:163 [] - Control channel established. 2021-04-28 20:33:24,882 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:203 [] - Initializing SDKHarness with unbounded number of workers. 2021-04-28 20:33:24,882 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - Beam Fn Control client connected with id 1-1 2021-04-28 20:33:24,999 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-2 2021-04-28 20:33:25,004 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:726 [] - Creating insecure state channel for localhost:60782. 2021-04-28 20:33:25,005 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:733 [] - State channel established. 2021-04-28 20:33:25,006 INFO /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:636 [] - Creating client data channel for localhost:60781 2021-04-28 20:33:25,010 INFO org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn Data client connected. 2021-04-28 20:33:25,012 ERROR /usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:260 [] - Error processing instruction 1. Original traceback is Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 2021-04-28 20:33:25,034 INFO org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Closing environment urn: "beam:env:process:v1" payload: "\032g/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/bin/pyflink-udf-runner.sh\"\262\001\n\004PATH\022\251\001/usr/local/anaconda3/bin:/usr/local/anaconda3/condabin:/Users/chenyisheng/softwares/apache-maven-3.6.3/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/go/bin\"G\n\017HADOOP_CONF_DIR\0224/Users/chenyisheng/softwares/hadoop-2.7.3/etc/hadoop\"\025\n\vLC_TERMINAL\022\006iTerm2\"$\n\vhttps_proxy\022\025http://127.0.0.1:7890\"\031\n\021CONDA_DEFAULT_ENV\022\004base\"F\n\021FLINK_PLUGINS_DIR\0221/Users/chenyisheng/softwares/flink-1.12.0/plugins\"3\n\020CONDA_PYTHON_EXE\022\037/usr/local/anaconda3/bin/python\"$\n\023table.exec.timezone\022\rAsia/Shanghai\"$\n\fCONDA_PREFIX\022\024/usr/local/anaconda3\"\026\n\tCOLORTERM\022\ttruecolor\"@\n\016FLINK_CONF_DIR\022./Users/chenyisheng/softwares/flink-1.12.0/conf\"\027\n\023FLINK_ENV_JAVA_OPTS\022\000\"\026\n\aLOGNAME\022\vchenyisheng\">\n\003PWD\0227/Users/chenyisheng/source/yiksanchan/pyflink-quickstart\"\035\n\024TERM_PROGRAM_VERSION\022\0053.4.4\"\316\001\n\nPYTHONPATH\022\277\001/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-f00bfb4e-2487-4402-b034-c09ce91f645b/python-files/blob_p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-ec48f21df40ec0dca89972092c7efdfb\">\n\006python\0224/usr/local/anaconda3/envs/featflow-ml-env/bin/python\"\021\n\005SHELL\022\b/bin/zsh\"\r\n\005PAGER\022\004less\"\031\n\023MAX_LOG_FILE_NUMBER\022\00210\"\032\n\021SECURITYSESSIONID\022\005186a6\"\210\001\n\023_PYTHON_WORKING_DIR\022q/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-f00bfb4e-2487-4402-b034-c09ce91f645b/python-archives\"P\n\025JAVA_MAIN_CLASS_27268\0227org.apache.flink.runtime.taskexecutor.TaskManagerRunner\"$\n\003ZSH\022\035/Users/chenyisheng/.oh-my-zsh\"\030\n\rITERM_PROFILE\022\aDefault\";\n\006TMPDIR\0221/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/\"\020\n\tXPC_FLAGS\022\0030x0\">\n\017TERM_SESSION_ID\022+w0t2p0:DD76AB31-797F-4117-8904-00DFE010444A\"8\n\vHADOOP_HOME\022)/Users/chenyisheng/softwares/hadoop-2.7.3\"(\n\027__CF_USER_TEXT_ENCODING\022\r0x1F5:0x0:0x0\" \n\025CONDA_PROMPT_MODIFIER\022\a(base) \"\n\n\004LESS\022\002-R\"\021\n\bLC_CTYPE\022\005UTF-8\"\020\n\tCOLORFGBG\022\0037;0\"\n\n\005SHLVL\022\0014\">\n\rFLINK_BIN_DIR\022-/Users/chenyisheng/softwares/flink-1.12.0/bin\"+\n\tCONDA_EXE\022\036/usr/local/anaconda3/bin/conda\"O\n\tJAVA_HOME\022B/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home\"\026\n\004TERM\022\016xterm-256color\"\030\n\fCOMMAND_MODE\022\bunix2003\"q\n\fBOOT_LOG_DIR\022a/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-f00bfb4e-2487-4402-b034-c09ce91f645b\"(\n\aGOPROXY\022\035http://goproxy.pri.ibanyu.com\"?\n\020ITERM_SESSION_ID\022+w0t2p0:DD76AB31-797F-4117-8904-00DFE010444A\"\t\n\005_CE_M\022\000\"(\n\033_PYTHON_WORKER_MEMORY_LIMIT\022\t536870920\"$\n\tall_proxy\022\027socks5://127.0.0.1:7891\"\025\n\020XPC_SERVICE_NAME\022\0010\"\020\n\vCONDA_SHLVL\022\0011\"P\n\001_\022K/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java\"#\n\nhttp_proxy\022\025http://127.0.0.1:7890\">\n\rFLINK_OPT_DIR\022-/Users/chenyisheng/softwares/flink-1.12.0/opt\"\034\n\023LC_TERMINAL_VERSION\022\0053.4.4\"\031\n\fTERM_PROGRAM\022\tiTerm.app\"\"\n\bLSCOLORS\022\026Gxfxcxdxbxegedabagacad\" \n\030PYFLINK_GATEWAY_DISABLED\022\004true\"S\n$SQLITE_EXEMPT_PATH_FROM_VNODE_GUARDS\022+/Users/chenyisheng/Library/WebKit/Databases\">\n\rFLINK_LIB_DIR\022-/Users/chenyisheng/softwares/flink-1.12.0/lib\"\023\n\004USER\022\vchenyisheng\"8\n\020LaunchInstanceID\022$0FB0C719-10F8-4B20-A666-B827E3E7E67E\"D\n\rSSH_AUTH_SOCK\0223/private/tmp/com.apple.launchd.Af15E9TVkQ/Listeners\"\r\n\t_CE_CONDA\022\000\"\300\021\n\024FLINK_INHERITED_LOGS\022\247\021\n\n\nTM_RESOURCE_PARAMS extraction logs:\njvm_params: -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456\ndynamic_configs: -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.network.min=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.memory.task.off-heap.size=0b\nlogs: INFO [] - Loading configuration property: jobmanager.rpc.address, localhost\nINFO [] - Loading configuration property: jobmanager.rpc.port, 6123\nINFO [] - Loading configuration property: jobmanager.memory.process.size, 1600m\nINFO [] - Loading configuration property: taskmanager.memory.process.size, 1728m\nINFO [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1\nINFO [] - Loading configuration property: parallelism.default, 1\nINFO [] - Loading configuration property: jobmanager.execution.failover-strategy, region\nINFO [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead\nINFO [] - Final TaskExecutor Memory configuration:\nINFO [] - Total Process Memory: 1.688gb (1811939328 bytes)\nINFO [] - Total Flink Memory: 1.250gb (1342177280 bytes)\nINFO [] - Total JVM Heap Memory: 512.000mb (536870902 bytes)\nINFO [] - Framework: 128.000mb (134217728 bytes)\nINFO [] - Task: 384.000mb (402653174 bytes)\nINFO [] - Total Off-heap Memory: 768.000mb (805306378 bytes)\nINFO [] - Managed: 512.000mb (536870920 bytes)\nINFO [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)\nINFO [] - Framework: 128.000mb (134217728 bytes)\nINFO [] - Task: 0 bytes\nINFO [] - Network: 128.000mb (134217730 bytes)\nINFO [] - JVM Metaspace: 256.000mb (268435456 bytes)\nINFO [] - JVM Overhead: 192.000mb (201326592 bytes)\n\"v\n\bJVM_ARGS\022j -XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456\"\032\n\004HOME\022\022/Users/chenyisheng" 2021-04-28 20:33:25,036 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - 1 Beam Fn Logging clients still connected during shutdown. 2021-04-28 20:33:25,041 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint. 2021-04-28 20:33:25,041 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint. 2021-04-28 20:33:25,217 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c) switched from RUNNING to FAILED. java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_282] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:128) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:206) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282] at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:206) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-67e5885bf7764a1f15c4c20af9ab8585:1.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282] ... 1 more 2021-04-28 20:33:25,219 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (1dcc1ec7e4771edc8e69408879c6015c). 2021-04-28 20:33:25,223 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 1dcc1ec7e4771edc8e69408879c6015c. 2021-04-28 20:33:25,264 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 67c3bb602e3b4c9afdba15e1b9b1a998, jobId: aa5b81f2862b5ef4da119cf0fc1d8de0). 2021-04-28 20:33:25,264 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job aa5b81f2862b5ef4da119cf0fc1d8de0 from job leader monitoring. 2021-04-28 20:33:25,265 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job aa5b81f2862b5ef4da119cf0fc1d8de0. 2021-04-28 20:40:23,727 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request ec619197d53f5947072c3565893ec54d for job 00c7bbd130d9fe97d3d36343f9fdf77e from resource manager with leader id 00000000000000000000000000000000. 2021-04-28 20:40:23,727 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for ec619197d53f5947072c3565893ec54d. 2021-04-28 20:40:23,727 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 00c7bbd130d9fe97d3d36343f9fdf77e for job leader monitoring. 2021-04-28 20:40:23,727 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_4 with leader id 00000000-0000-0000-0000-000000000000. 2021-04-28 20:40:23,733 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2021-04-28 20:40:23,751 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_4 for job 00c7bbd130d9fe97d3d36343f9fdf77e. 2021-04-28 20:40:23,751 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 00c7bbd130d9fe97d3d36343f9fdf77e. 2021-04-28 20:40:23,751 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 00c7bbd130d9fe97d3d36343f9fdf77e. 2021-04-28 20:40:23,759 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot ec619197d53f5947072c3565893ec54d. 2021-04-28 20:40:23,760 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot ec619197d53f5947072c3565893ec54d. 2021-04-28 20:40:23,763 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8), deploy into slot with allocation id ec619197d53f5947072c3565893ec54d. 2021-04-28 20:40:23,763 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8) switched from CREATED to DEPLOYING. 2021-04-28 20:40:23,764 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8) [DEPLOYING]. 2021-04-28 20:40:23,765 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 00c7bbd130d9fe97d3d36343f9fdf77e/p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d from localhost/127.0.0.1:60691 2021-04-28 20:40:23,993 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8) [DEPLOYING]. 2021-04-28 20:40:23,994 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'python_file_e30993dc0ed6e9ceb07477d5845a17f7f912d36a76073016dd68ee1bbb72c898'. 2021-04-28 20:40:23,994 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'python_archive_a641ab3c2118955827b6883737b6b56ada29b774ac4ebdc68fbe86954f408203'. 2021-04-28 20:40:23,995 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 00c7bbd130d9fe97d3d36343f9fdf77e/p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-46e4af3f6053b2334f40d3fbc7843799 from localhost/127.0.0.1:60691 2021-04-28 20:40:23,995 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2021-04-28 20:40:23,995 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 00c7bbd130d9fe97d3d36343f9fdf77e/p-dc1b94442abf644a62b5107ccbfe4c10d1d1856e-7f92d5c4d502c045737a1403aeedc3c7 from localhost/127.0.0.1:60691 2021-04-28 20:40:23,996 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8) switched from DEPLOYING to RUNNING. 2021-04-28 20:40:24,028 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) exceeded the 80 characters length limit and was truncated. 2021-04-28 20:40:24,032 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle size is configured to 100000. 2021-04-28 20:40:24,032 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle time is configured to 1000 milliseconds. 2021-04-28 20:40:24,083 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2021-04-28 20:40:24,083 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2021-04-28 20:40:24,083 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2021-04-28 20:40:24,083 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2021-04-28 20:40:24,083 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2021-04-28 20:40:24,083 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1 2021-04-28 20:40:24,084 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2021-04-28 20:40:24,768 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: /var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-636e4c65-2c06-4a8a-a297-fd6026dba20a/python-files/blob_p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-46e4af3f6053b2334f40d3fbc7843799 2021-04-28 20:40:24,768 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python working dir of python worker: /var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-636e4c65-2c06-4a8a-a297-fd6026dba20a/python-archives 2021-04-28 20:40:24,771 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python interpreter path: /usr/local/anaconda3/envs/featflow-env/bin/python 2021-04-28 20:40:24,787 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8) switched from RUNNING to FAILED. java.io.IOException: Cannot run program "/usr/local/anaconda3/envs/featflow-env/bin/python" (in directory "/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-636e4c65-2c06-4a8a-a297-fd6026dba20a/python-archives"): error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) ~[?:1.8.0_282] at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:134) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:94) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-a066300cab552a2421b49d8bbf0ff62d:1.12.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] Caused by: java.io.IOException: error=2, No such file or directory at java.lang.UNIXProcess.forkAndExec(Native Method) ~[?:1.8.0_282] at java.lang.UNIXProcess.<init>(UNIXProcess.java:247) ~[?:1.8.0_282] at java.lang.ProcessImpl.start(ProcessImpl.java:134) ~[?:1.8.0_282] at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ~[?:1.8.0_282] ... 23 more 2021-04-28 20:40:24,788 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (90f78a41e9e9499057efa577bed5d2c8). 2021-04-28 20:40:24,790 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 90f78a41e9e9499057efa577bed5d2c8. 2021-04-28 20:40:24,820 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: ec619197d53f5947072c3565893ec54d, jobId: 00c7bbd130d9fe97d3d36343f9fdf77e). 2021-04-28 20:40:24,820 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 00c7bbd130d9fe97d3d36343f9fdf77e from job leader monitoring. 2021-04-28 20:40:24,820 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 00c7bbd130d9fe97d3d36343f9fdf77e. 2021-04-28 20:41:15,713 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request dfe1e0c2981f1173082c08b57098e378 for job fc16d53246fa73c04db7a3de31a2da3a from resource manager with leader id 00000000000000000000000000000000. 2021-04-28 20:41:15,713 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for dfe1e0c2981f1173082c08b57098e378. 2021-04-28 20:41:15,713 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job fc16d53246fa73c04db7a3de31a2da3a for job leader monitoring. 2021-04-28 20:41:15,713 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_5 with leader id 00000000-0000-0000-0000-000000000000. 2021-04-28 20:41:15,717 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration 2021-04-28 20:41:15,733 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@localhost:6123/user/rpc/jobmanager_5 for job fc16d53246fa73c04db7a3de31a2da3a. 2021-04-28 20:41:15,733 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job fc16d53246fa73c04db7a3de31a2da3a. 2021-04-28 20:41:15,734 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job fc16d53246fa73c04db7a3de31a2da3a. 2021-04-28 20:41:15,742 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot dfe1e0c2981f1173082c08b57098e378. 2021-04-28 20:41:15,745 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot dfe1e0c2981f1173082c08b57098e378. 2021-04-28 20:41:15,751 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (c5ba490b7c3a195979f46aa6b6005ad6), deploy into slot with allocation id dfe1e0c2981f1173082c08b57098e378. 2021-04-28 20:41:15,751 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (c5ba490b7c3a195979f46aa6b6005ad6) switched from CREATED to DEPLOYING. 2021-04-28 20:41:15,752 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (c5ba490b7c3a195979f46aa6b6005ad6) [DEPLOYING]. 2021-04-28 20:41:15,753 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading fc16d53246fa73c04db7a3de31a2da3a/p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65 from localhost/127.0.0.1:60691 2021-04-28 20:41:15,962 INFO org.apache.flink.runtime.taskmanager.Task [] - Registering task at network: Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (c5ba490b7c3a195979f46aa6b6005ad6) [DEPLOYING]. 2021-04-28 20:41:15,962 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'python_file_e30993dc0ed6e9ceb07477d5845a17f7f912d36a76073016dd68ee1bbb72c898'. 2021-04-28 20:41:15,963 INFO org.apache.flink.runtime.taskmanager.Task [] - Obtaining local cache file for 'python_archive_a641ab3c2118955827b6883737b6b56ada29b774ac4ebdc68fbe86954f408203'. 2021-04-28 20:41:15,964 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading fc16d53246fa73c04db7a3de31a2da3a/p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-7865f2ca7662f8b58a0c172899fdc38b from localhost/127.0.0.1:60691 2021-04-28 20:41:15,964 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2021-04-28 20:41:15,964 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (c5ba490b7c3a195979f46aa6b6005ad6) switched from DEPLOYING to RUNNING. 2021-04-28 20:41:15,964 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading fc16d53246fa73c04db7a3de31a2da3a/p-dc1b94442abf644a62b5107ccbfe4c10d1d1856e-eb1f42a8eabe7919889b1ea05e22dcce from localhost/127.0.0.1:60691 2021-04-28 20:41:16,016 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) exceeded the 80 characters length limit and was truncated. 2021-04-28 20:41:16,018 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle size is configured to 100000. 2021-04-28 20:41:16,018 INFO org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - The maximum bundle time is configured to 1000 milliseconds. 2021-04-28 20:41:16,053 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2021-04-28 20:41:16,053 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2021-04-28 20:41:16,054 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2021-04-28 20:41:16,054 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2021-04-28 20:41:16,054 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2021-04-28 20:41:16,054 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1 2021-04-28 20:41:16,054 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2021-04-28 20:41:16,605 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: /var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-febdccd8-3a52-4960-b9be-b884b4e251f9/python-files/blob_p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-7865f2ca7662f8b58a0c172899fdc38b 2021-04-28 20:41:16,606 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python working dir of python worker: /var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-febdccd8-3a52-4960-b9be-b884b4e251f9/python-archives 2021-04-28 20:41:16,614 INFO org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python interpreter path: /usr/local/anaconda3/envs/pyflink-quickstart/bin/python 2021-04-28 20:41:16,687 INFO org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner [] - Obtained shared Python process of size 536870920 bytes 2021-04-28 20:41:21,949 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/bin/pyflink-udf-runner.sh' for worker id 1-1 2021-04-28 20:41:25,608 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected. 2021-04-28 20:41:25,620 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:98 [] - Logging handler created. 2021-04-28 20:41:25,621 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:125 [] - semi_persistent_directory: /tmp 2021-04-28 20:41:25,621 WARN /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:240 [] - No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail. 2021-04-28 20:41:25,622 WARN /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/options/pipeline_options.py:309 [] - Discarding unparseable args: ['--app_name=BeamPythonFunctionRunner', '--options_id=0.0'] 2021-04-28 20:41:25,621 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - Beam Fn Control client connected with id 1-1 2021-04-28 20:41:25,622 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:138 [] - Python sdk harness started with pipeline_options: {} 2021-04-28 20:41:25,622 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/statecache.py:154 [] - Creating state cache with size 0 2021-04-28 20:41:25,623 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:155 [] - Creating insecure control channel for localhost:61292. 2021-04-28 20:41:25,624 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:79 [] - Status HTTP server running at 1.0.0.127.in-addr.arpa:61331 2021-04-28 20:41:25,624 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:163 [] - Control channel established. 2021-04-28 20:41:25,625 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:203 [] - Initializing SDKHarness with unbounded number of workers. 2021-04-28 20:41:25,799 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-2 2021-04-28 20:41:25,808 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:726 [] - Creating insecure state channel for localhost:61297. 2021-04-28 20:41:25,808 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:733 [] - State channel established. 2021-04-28 20:41:25,810 INFO /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:636 [] - Creating client data channel for localhost:61296 2021-04-28 20:41:25,813 INFO org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn Data client connected. 2021-04-28 20:41:25,820 ERROR /usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:260 [] - Error processing instruction 1. Original traceback is Traceback (most recent call last): File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 2021-04-28 20:41:25,850 INFO org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory [] - Closing environment urn: "beam:env:process:v1" payload: "\032j/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/bin/pyflink-udf-runner.sh\"\262\001\n\004PATH\022\251\001/usr/local/anaconda3/bin:/usr/local/anaconda3/condabin:/Users/chenyisheng/softwares/apache-maven-3.6.3/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/go/bin\"G\n\017HADOOP_CONF_DIR\0224/Users/chenyisheng/softwares/hadoop-2.7.3/etc/hadoop\"\025\n\vLC_TERMINAL\022\006iTerm2\"$\n\vhttps_proxy\022\025http://127.0.0.1:7890\"\031\n\021CONDA_DEFAULT_ENV\022\004base\"F\n\021FLINK_PLUGINS_DIR\0221/Users/chenyisheng/softwares/flink-1.12.0/plugins\"3\n\020CONDA_PYTHON_EXE\022\037/usr/local/anaconda3/bin/python\"$\n\023table.exec.timezone\022\rAsia/Shanghai\"$\n\fCONDA_PREFIX\022\024/usr/local/anaconda3\"\026\n\tCOLORTERM\022\ttruecolor\"@\n\016FLINK_CONF_DIR\022./Users/chenyisheng/softwares/flink-1.12.0/conf\"\027\n\023FLINK_ENV_JAVA_OPTS\022\000\"\026\n\aLOGNAME\022\vchenyisheng\">\n\003PWD\0227/Users/chenyisheng/source/yiksanchan/pyflink-quickstart\"\035\n\024TERM_PROGRAM_VERSION\022\0053.4.4\"\316\001\n\nPYTHONPATH\022\277\001/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-febdccd8-3a52-4960-b9be-b884b4e251f9/python-files/blob_p-fd53b6c450e5bfaa892f48ac3165814b3e6f59ec-7865f2ca7662f8b58a0c172899fdc38b\"A\n\006python\0227/usr/local/anaconda3/envs/pyflink-quickstart/bin/python\"\021\n\005SHELL\022\b/bin/zsh\"\r\n\005PAGER\022\004less\"\031\n\023MAX_LOG_FILE_NUMBER\022\00210\"\032\n\021SECURITYSESSIONID\022\005186a6\"\210\001\n\023_PYTHON_WORKING_DIR\022q/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-febdccd8-3a52-4960-b9be-b884b4e251f9/python-archives\"P\n\025JAVA_MAIN_CLASS_27268\0227org.apache.flink.runtime.taskexecutor.TaskManagerRunner\"$\n\003ZSH\022\035/Users/chenyisheng/.oh-my-zsh\"\030\n\rITERM_PROFILE\022\aDefault\";\n\006TMPDIR\0221/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/\"\020\n\tXPC_FLAGS\022\0030x0\">\n\017TERM_SESSION_ID\022+w0t2p0:DD76AB31-797F-4117-8904-00DFE010444A\"8\n\vHADOOP_HOME\022)/Users/chenyisheng/softwares/hadoop-2.7.3\"(\n\027__CF_USER_TEXT_ENCODING\022\r0x1F5:0x0:0x0\" \n\025CONDA_PROMPT_MODIFIER\022\a(base) \"\n\n\004LESS\022\002-R\"\021\n\bLC_CTYPE\022\005UTF-8\"\020\n\tCOLORFGBG\022\0037;0\"\n\n\005SHLVL\022\0014\">\n\rFLINK_BIN_DIR\022-/Users/chenyisheng/softwares/flink-1.12.0/bin\"+\n\tCONDA_EXE\022\036/usr/local/anaconda3/bin/conda\"O\n\tJAVA_HOME\022B/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home\"\026\n\004TERM\022\016xterm-256color\"\030\n\fCOMMAND_MODE\022\bunix2003\"q\n\fBOOT_LOG_DIR\022a/var/folders/mv/cqj767rd5631xfy3hhrl8lcm0000gn/T/python-dist-febdccd8-3a52-4960-b9be-b884b4e251f9\"(\n\aGOPROXY\022\035http://goproxy.pri.ibanyu.com\"?\n\020ITERM_SESSION_ID\022+w0t2p0:DD76AB31-797F-4117-8904-00DFE010444A\"\t\n\005_CE_M\022\000\"(\n\033_PYTHON_WORKER_MEMORY_LIMIT\022\t536870920\"$\n\tall_proxy\022\027socks5://127.0.0.1:7891\"\025\n\020XPC_SERVICE_NAME\022\0010\"\020\n\vCONDA_SHLVL\022\0011\"P\n\001_\022K/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java\"#\n\nhttp_proxy\022\025http://127.0.0.1:7890\">\n\rFLINK_OPT_DIR\022-/Users/chenyisheng/softwares/flink-1.12.0/opt\"\034\n\023LC_TERMINAL_VERSION\022\0053.4.4\"\031\n\fTERM_PROGRAM\022\tiTerm.app\"\"\n\bLSCOLORS\022\026Gxfxcxdxbxegedabagacad\" \n\030PYFLINK_GATEWAY_DISABLED\022\004true\"S\n$SQLITE_EXEMPT_PATH_FROM_VNODE_GUARDS\022+/Users/chenyisheng/Library/WebKit/Databases\">\n\rFLINK_LIB_DIR\022-/Users/chenyisheng/softwares/flink-1.12.0/lib\"\023\n\004USER\022\vchenyisheng\"8\n\020LaunchInstanceID\022$0FB0C719-10F8-4B20-A666-B827E3E7E67E\"D\n\rSSH_AUTH_SOCK\0223/private/tmp/com.apple.launchd.Af15E9TVkQ/Listeners\"\r\n\t_CE_CONDA\022\000\"\300\021\n\024FLINK_INHERITED_LOGS\022\247\021\n\n\nTM_RESOURCE_PARAMS extraction logs:\njvm_params: -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456\ndynamic_configs: -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=134217730b -D taskmanager.memory.network.min=134217730b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=402653174b -D taskmanager.memory.task.off-heap.size=0b\nlogs: INFO [] - Loading configuration property: jobmanager.rpc.address, localhost\nINFO [] - Loading configuration property: jobmanager.rpc.port, 6123\nINFO [] - Loading configuration property: jobmanager.memory.process.size, 1600m\nINFO [] - Loading configuration property: taskmanager.memory.process.size, 1728m\nINFO [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1\nINFO [] - Loading configuration property: parallelism.default, 1\nINFO [] - Loading configuration property: jobmanager.execution.failover-strategy, region\nINFO [] - The derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead\nINFO [] - Final TaskExecutor Memory configuration:\nINFO [] - Total Process Memory: 1.688gb (1811939328 bytes)\nINFO [] - Total Flink Memory: 1.250gb (1342177280 bytes)\nINFO [] - Total JVM Heap Memory: 512.000mb (536870902 bytes)\nINFO [] - Framework: 128.000mb (134217728 bytes)\nINFO [] - Task: 384.000mb (402653174 bytes)\nINFO [] - Total Off-heap Memory: 768.000mb (805306378 bytes)\nINFO [] - Managed: 512.000mb (536870920 bytes)\nINFO [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)\nINFO [] - Framework: 128.000mb (134217728 bytes)\nINFO [] - Task: 0 bytes\nINFO [] - Network: 128.000mb (134217730 bytes)\nINFO [] - JVM Metaspace: 256.000mb (268435456 bytes)\nINFO [] - JVM Overhead: 192.000mb (201326592 bytes)\n\"v\n\bJVM_ARGS\022j -XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456\"\032\n\004HOME\022\022/Users/chenyisheng" 2021-04-28 20:41:25,852 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - 1 Beam Fn Logging clients still connected during shutdown. 2021-04-28 20:41:25,860 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint. 2021-04-28 20:41:26,058 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mySource]], fields=[a]) -> StreamExecPythonCalc -> Sink: Sink(table=[default_catalog.default_database.mySink], fields=[EXPR$0]) (1/1)#0 (c5ba490b7c3a195979f46aa6b6005ad6) switched from RUNNING to FAILED. java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) ~[?:1.8.0_282] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:128) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:206) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last): File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_282] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_282] at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:206) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:124) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:315) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:425) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.advanceToEndOfEventTime(StreamSource.java:122) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:132) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) ~[flink-dist_2.11-1.12.0.jar:1.12.0] ... 7 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last): File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get processor = self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from empty list During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get self.data_channel_factory) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__ self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp> (transform_id, get_operation(transform_id)) for transform_id in sorted( File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp> pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper result = cache[args] = func(*args) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation transform_id, transform_consumers) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function operations.ScalarFunctionOperation) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation internal_operation_cls) File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__ super(ScalarFunctionOperation, self).__init__(spec) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__ self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp> [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs]) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function user_defined_func = pickle.loads(user_defined_function_proto.payload) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads return cloudpickle.loads(payload) ValueError: unsupported pickle protocol: 5 at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-99e0059768e77661ae18da0b331e3a65:1.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282] ... 1 more ``` It is very long, but I believe the gist is: "ValueError: unsupported pickle protocol: 5". To reproduce, go to https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3, then you will find resources.zip, decrypt_fun.py and udf_use_resource.py. Regarding /usr/local/anaconda3/envs/pyflink-quickstart/bin/python, it is basically a conda env, with dependencies: ``` $ conda list # packages in environment at /usr/local/anaconda3/envs/pyflink-quickstart:# # Name Version Build Channel apache-beam 2.23.0 pypi_0 pypi apache-flink 1.12.0 pypi_0 pypi appdirs 1.4.4 pypi_0 pypi attrs 20.3.0 pypi_0 pypi avro-python3 1.9.1 pypi_0 pypi black 20.8b1 pypi_0 pypi ca-certificates 2021.1.19 hecd8cb5_1 certifi 2020.12.5 py37hecd8cb5_0 chardet 4.0.0 pypi_0 pypi click 7.1.2 pypi_0 pypi cloudpickle 1.2.2 pypi_0 pypi confluent-kafka 1.6.0 pypi_0 pypi crcmod 1.7 pypi_0 pypi dill 0.3.1.1 pypi_0 pypi docopt 0.6.2 pypi_0 pypi fastavro 0.23.6 pypi_0 pypi future 0.18.2 pypi_0 pypi grpcio 1.36.1 pypi_0 pypi hdfs 2.6.0 pypi_0 pypi httplib2 0.17.4 pypi_0 pypi idna 2.10 pypi_0 pypi importlib-metadata 3.7.3 pypi_0 pypi iniconfig 1.1.1 pypi_0 pypi jsonpickle 1.2 pypi_0 pypi libcxx 10.0.0 1 libedit 3.1.20191231 h1de35cc_1 libffi 3.3 hb1e8313_2 mock 2.0.0 pypi_0 pypi mypy-extensions 0.4.3 pypi_0 pypi ncurses 6.2 h0a44026_1 numpy 1.19.5 pypi_0 pypi oauth2client 3.0.0 pypi_0 pypi openssl 1.1.1j h9ed2024_0 packaging 20.9 pypi_0 pypi pandas 0.25.3 pypi_0 pypi pathspec 0.8.1 pypi_0 pypi pbr 5.5.1 pypi_0 pypi pip 21.0.1 py37hecd8cb5_0 pluggy 0.13.1 pypi_0 pypi protobuf 3.15.6 pypi_0 pypi py 1.10.0 pypi_0 pypi py4j 0.10.8.1 pypi_0 pypi pyarrow 0.17.1 pypi_0 pypi pyasn1 0.4.8 pypi_0 pypi pyasn1-modules 0.2.8 pypi_0 pypi pydot 1.4.2 pypi_0 pypi pymongo 3.11.3 pypi_0 pypi pyparsing 2.4.7 pypi_0 pypi pytest 6.2.2 pypi_0 pypi python 3.7.10 h88f2d9e_0 python-dateutil 2.8.0 pypi_0 pypi pytz 2021.1 pypi_0 pypi readline 8.1 h9ed2024_0 regex 2021.3.17 pypi_0 pypi requests 2.25.1 pypi_0 pypi rsa 4.7.2 pypi_0 pypi setuptools 52.0.0 py37hecd8cb5_0 six 1.15.0 pypi_0 pypi sqlite 3.33.0 hffcf06c_0 tk 8.6.10 hb0a8c7a_0 toml 0.10.2 pypi_0 pypi typed-ast 1.4.2 pypi_0 pypi typing-extensions 3.7.4.3 pypi_0 pypi urllib3 1.26.3 pypi_0 pypi wheel 0.36.2 pyhd3eb1b0_0 xz 5.2.5 h1de35cc_0 zipp 3.4.1 pypi_0 pypi zlib 1.2.11 h1de35cc_3 ``` Any help? Thanks! Best, Yik San |
Hi, Thanks a lot for reporting this issue. I created a JIRA[1] to describe the root cause of this problem in detail. What's more, there are two possible workaround ways. 1. Client side use Python 3.6 or 3.7. For how to specify the client-side python environment, please refer to the doc[2]. 2. Use Python 3.8 in cluster side as you are using now. [1] https://issues.apache.org/jira/browse/FLINK-22517 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-client-executable Best, Xingbo Yik San Chan <[hidden email]> 于2021年4月28日周三 下午8:48写道:
|
Thank you! That answers my question.
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |