PyFlink: Upload resource files to Flink cluster

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink: Upload resource files to Flink cluster

Sumeet Malhotra
Hi,

I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As of now, my custom Flink image creates a fixed path with the required resource files, but I'd like it to be run time configurable.

There are 2 APIs available to load files when submitting a PyFlink job...

stream_execution_environment.add_python_file() - Recommended to upload files (.py etc) but doesn't let me configure the final path on the target node. The files are added to PYTHONPATH, but it needs the UDTF function to lookup for this file. I'd like to pass the file location into the UDTF instead.

stream_execution_environment.add_python_archive() - Appears to be more generic, in the sense that it allows a target directory to be specified. The documentation doesn't say anything about the contents of the archive, so I'm guessing it could be any type of file. Is this what is needed for my use case?

Or is there any other recommended way to upload non-Python dependencies/resources?

Thanks in advance,
Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink: Upload resource files to Flink cluster

Roman Khachatryan
Hi,

I think the second option is what you need. The documentation says
only zip format is supported.
Alternatively, you could upload the files to S3 or other DFS and
access from TMs and re-upload when needed.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives

Regards,
Roman

On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
<[hidden email]> wrote:

>
> Hi,
>
> I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As of now, my custom Flink image creates a fixed path with the required resource files, but I'd like it to be run time configurable.
>
> There are 2 APIs available to load files when submitting a PyFlink job...
>
> stream_execution_environment.add_python_file() - Recommended to upload files (.py etc) but doesn't let me configure the final path on the target node. The files are added to PYTHONPATH, but it needs the UDTF function to lookup for this file. I'd like to pass the file location into the UDTF instead.
>
> stream_execution_environment.add_python_archive() - Appears to be more generic, in the sense that it allows a target directory to be specified. The documentation doesn't say anything about the contents of the archive, so I'm guessing it could be any type of file. Is this what is needed for my use case?
>
> Or is there any other recommended way to upload non-Python dependencies/resources?
>
> Thanks in advance,
> Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink: Upload resource files to Flink cluster

Sumeet Malhotra
Thank you Roman. Yes, that's what I am going to do.

But I'm running into another issue... when I specify the --pyArchives option on the command line, the job never gets submitted and is stuck forever. And when I try to programmatically do this by calling add_python_archive(), the job gets submitted but fails because the target directory is not found on the UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is forwarded to the localhost.

Here's the command line I use:

~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python my_job.py  --pyArchives file:///path/to/schema.zip#schema

And within the UDF I'm access the schema file as:

read_schema('schema/my_schema.json')

Or if I try using the API instead of the command-line, the call looks as:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_python_archive('schema.zip', 'schema')

Initially, my_job.py itself had its own command line options, and I was thinking that might interfere with the overall Flink command line options, but even after removing that I'm not able to submit the job anymore. However, if I don't use the --pyArchives option and manually transfer the schema file to a location on the UDF node, the job gets submitted and works as expected.

Any reason why this might happen?

Thanks,
Sumeet


On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <[hidden email]> wrote:
Hi,

I think the second option is what you need. The documentation says
only zip format is supported.
Alternatively, you could upload the files to S3 or other DFS and
access from TMs and re-upload when needed.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives

Regards,
Roman

On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
<[hidden email]> wrote:
>
> Hi,
>
> I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As of now, my custom Flink image creates a fixed path with the required resource files, but I'd like it to be run time configurable.
>
> There are 2 APIs available to load files when submitting a PyFlink job...
>
> stream_execution_environment.add_python_file() - Recommended to upload files (.py etc) but doesn't let me configure the final path on the target node. The files are added to PYTHONPATH, but it needs the UDTF function to lookup for this file. I'd like to pass the file location into the UDTF instead.
>
> stream_execution_environment.add_python_archive() - Appears to be more generic, in the sense that it allows a target directory to be specified. The documentation doesn't say anything about the contents of the archive, so I'm guessing it could be any type of file. Is this what is needed for my use case?
>
> Or is there any other recommended way to upload non-Python dependencies/resources?
>
> Thanks in advance,
> Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink: Upload resource files to Flink cluster

Roman Khachatryan
Hi Sumeet,

Probably there is an issue with uploading the archive while submitting the job.
The commands and API usage look good to me.
Dian could you please confirm that?

Regards,
Roman

On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
<[hidden email]> wrote:

>
> Thank you Roman. Yes, that's what I am going to do.
>
> But I'm running into another issue... when I specify the --pyArchives option on the command line, the job never gets submitted and is stuck forever. And when I try to programmatically do this by calling add_python_archive(), the job gets submitted but fails because the target directory is not found on the UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is forwarded to the localhost.
>
> Here's the command line I use:
>
> ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python my_job.py  --pyArchives file:///path/to/schema.zip#schema
>
> And within the UDF I'm access the schema file as:
>
> read_schema('schema/my_schema.json')
>
> Or if I try using the API instead of the command-line, the call looks as:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_python_archive('schema.zip', 'schema')
>
> Initially, my_job.py itself had its own command line options, and I was thinking that might interfere with the overall Flink command line options, but even after removing that I'm not able to submit the job anymore. However, if I don't use the --pyArchives option and manually transfer the schema file to a location on the UDF node, the job gets submitted and works as expected.
>
> Any reason why this might happen?
>
> Thanks,
> Sumeet
>
>
> On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> I think the second option is what you need. The documentation says
>> only zip format is supported.
>> Alternatively, you could upload the files to S3 or other DFS and
>> access from TMs and re-upload when needed.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>>
>> Regards,
>> Roman
>>
>> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>> <[hidden email]> wrote:
>> >
>> > Hi,
>> >
>> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As of now, my custom Flink image creates a fixed path with the required resource files, but I'd like it to be run time configurable.
>> >
>> > There are 2 APIs available to load files when submitting a PyFlink job...
>> >
>> > stream_execution_environment.add_python_file() - Recommended to upload files (.py etc) but doesn't let me configure the final path on the target node. The files are added to PYTHONPATH, but it needs the UDTF function to lookup for this file. I'd like to pass the file location into the UDTF instead.
>> >
>> > stream_execution_environment.add_python_archive() - Appears to be more generic, in the sense that it allows a target directory to be specified. The documentation doesn't say anything about the contents of the archive, so I'm guessing it could be any type of file. Is this what is needed for my use case?
>> >
>> > Or is there any other recommended way to upload non-Python dependencies/resources?
>> >
>> > Thanks in advance,
>> > Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink: Upload resource files to Flink cluster

Sumeet Malhotra
I'm using a standalone deployment on Kubernetes for this use case. Does the archive get uploaded to the cluster via the :8081 REST/WebUI port or via some other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not exposing those ports on the local machine might prevent the archive from getting loaded? Although I would have expected an explicit error in that case.

NAMESPACE     NAME               TYPE           PORTS     
flink         flink-jobmanager   ClusterIP      rpc:6123►0 blob-server:6124►0 webui:8081►0

Thanks,
Sumeet


On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan <[hidden email]> wrote:
Hi Sumeet,

Probably there is an issue with uploading the archive while submitting the job.
The commands and API usage look good to me.
Dian could you please confirm that?

Regards,
Roman

On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
<[hidden email]> wrote:
>
> Thank you Roman. Yes, that's what I am going to do.
>
> But I'm running into another issue... when I specify the --pyArchives option on the command line, the job never gets submitted and is stuck forever. And when I try to programmatically do this by calling add_python_archive(), the job gets submitted but fails because the target directory is not found on the UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is forwarded to the localhost.
>
> Here's the command line I use:
>
> ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python my_job.py  --pyArchives file:///path/to/schema.zip#schema
>
> And within the UDF I'm access the schema file as:
>
> read_schema('schema/my_schema.json')
>
> Or if I try using the API instead of the command-line, the call looks as:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_python_archive('schema.zip', 'schema')
>
> Initially, my_job.py itself had its own command line options, and I was thinking that might interfere with the overall Flink command line options, but even after removing that I'm not able to submit the job anymore. However, if I don't use the --pyArchives option and manually transfer the schema file to a location on the UDF node, the job gets submitted and works as expected.
>
> Any reason why this might happen?
>
> Thanks,
> Sumeet
>
>
> On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> I think the second option is what you need. The documentation says
>> only zip format is supported.
>> Alternatively, you could upload the files to S3 or other DFS and
>> access from TMs and re-upload when needed.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>>
>> Regards,
>> Roman
>>
>> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>> <[hidden email]> wrote:
>> >
>> > Hi,
>> >
>> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As of now, my custom Flink image creates a fixed path with the required resource files, but I'd like it to be run time configurable.
>> >
>> > There are 2 APIs available to load files when submitting a PyFlink job...
>> >
>> > stream_execution_environment.add_python_file() - Recommended to upload files (.py etc) but doesn't let me configure the final path on the target node. The files are added to PYTHONPATH, but it needs the UDTF function to lookup for this file. I'd like to pass the file location into the UDTF instead.
>> >
>> > stream_execution_environment.add_python_archive() - Appears to be more generic, in the sense that it allows a target directory to be specified. The documentation doesn't say anything about the contents of the archive, so I'm guessing it could be any type of file. Is this what is needed for my use case?
>> >
>> > Or is there any other recommended way to upload non-Python dependencies/resources?
>> >
>> > Thanks in advance,
>> > Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink: Upload resource files to Flink cluster

Dian Fu
Hi Sumeet,

The archive files will be uploaded to the blob server. This is the same no matter specifying the archives via command line option `—pyArchives` or via `add_python_archive`. 


And when I try to programmatically do this by calling add_python_archive(), the job gets submitted but fails because the target directory is not found on the UDF node.

Could you share some code snippet, e.g. is this a Table API program or a DataStream API program? Besides, could you share the exception stack?

Regards,
Dian

2021年6月11日 下午7:25,Sumeet Malhotra <[hidden email]> 写道:

I'm using a standalone deployment on Kubernetes for this use case. Does the archive get uploaded to the cluster via the :8081 REST/WebUI port or via some other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not exposing those ports on the local machine might prevent the archive from getting loaded? Although I would have expected an explicit error in that case.

NAMESPACE     NAME               TYPE           PORTS     
flink         flink-jobmanager   ClusterIP      rpc:6123►0 blob-server:6124►0 webui:8081►0

Thanks,
Sumeet


On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan <[hidden email]> wrote:
Hi Sumeet,

Probably there is an issue with uploading the archive while submitting the job.
The commands and API usage look good to me.
Dian could you please confirm that?

Regards,
Roman

On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
<[hidden email]> wrote:
>
> Thank you Roman. Yes, that's what I am going to do.
>
> But I'm running into another issue... when I specify the --pyArchives option on the command line, the job never gets submitted and is stuck forever. And when I try to programmatically do this by calling add_python_archive(), the job gets submitted but fails because the target directory is not found on the UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is forwarded to the localhost.
>
> Here's the command line I use:
>
> ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python my_job.py  --pyArchives file:///path/to/schema.zip#schema
>
> And within the UDF I'm access the schema file as:
>
> read_schema('schema/my_schema.json')
>
> Or if I try using the API instead of the command-line, the call looks as:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_python_archive('schema.zip', 'schema')
>
> Initially, my_job.py itself had its own command line options, and I was thinking that might interfere with the overall Flink command line options, but even after removing that I'm not able to submit the job anymore. However, if I don't use the --pyArchives option and manually transfer the schema file to a location on the UDF node, the job gets submitted and works as expected.
>
> Any reason why this might happen?
>
> Thanks,
> Sumeet
>
>
> On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <[hidden email]> wrote:
>>
>> Hi,
>>
>> I think the second option is what you need. The documentation says
>> only zip format is supported.
>> Alternatively, you could upload the files to S3 or other DFS and
>> access from TMs and re-upload when needed.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>>
>> Regards,
>> Roman
>>
>> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>> <[hidden email]> wrote:
>> >
>> > Hi,
>> >
>> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON schema files actually). The path of this file can be passed into the UDTF, but essentially this path needs to exist on the Task Manager node where the task executes. What's the best way to upload these resource files? As of now, my custom Flink image creates a fixed path with the required resource files, but I'd like it to be run time configurable.
>> >
>> > There are 2 APIs available to load files when submitting a PyFlink job...
>> >
>> > stream_execution_environment.add_python_file() - Recommended to upload files (.py etc) but doesn't let me configure the final path on the target node. The files are added to PYTHONPATH, but it needs the UDTF function to lookup for this file. I'd like to pass the file location into the UDTF instead.
>> >
>> > stream_execution_environment.add_python_archive() - Appears to be more generic, in the sense that it allows a target directory to be specified. The documentation doesn't say anything about the contents of the archive, so I'm guessing it could be any type of file. Is this what is needed for my use case?
>> >
>> > Or is there any other recommended way to upload non-Python dependencies/resources?
>> >
>> > Thanks in advance,
>> > Sumeet