PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Rinat
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using python API.
Currently I faced an issue in both server env and my local IDE environment. 

When I'm trying to execute the example below I got an error message: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size

Of course I've added required property into flink-conf.yaml and checked that pyflink-shell.sh initializes env using specified configuration but it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

Here is an example:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1"),
("user-2", "http://url/2"),
("user-1", "http://url/3"),
("user-3", "http://url/4"),
("user-1", "http://url/3")
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Xingbo Huang
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

The flink-conf.yaml way will only take effect when submitted through flink run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat <[hidden email]> 于2020年10月13日周二 上午1:56写道:
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using python API.
Currently I faced an issue in both server env and my local IDE environment. 

When I'm trying to execute the example below I got an error message: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size

Of course I've added required property into flink-conf.yaml and checked that pyflink-shell.sh initializes env using specified configuration but it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

Here is an example:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1"),
("user-2", "http://url/2"),
("user-1", "http://url/3"),
("user-3", "http://url/4"),
("user-1", "http://url/3")
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Rinat
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view, that pyflink-shell.sh doesn't use provided flink-conf.yaml, don't you think that it looks like an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <[hidden email]>:
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

The flink-conf.yaml way will only take effect when submitted through flink run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat <[hidden email]> 于2020年10月13日周二 上午1:56写道:
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using python API.
Currently I faced an issue in both server env and my local IDE environment. 

When I'm trying to execute the example below I got an error message: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size

Of course I've added required property into flink-conf.yaml and checked that pyflink-shell.sh initializes env using specified configuration but it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

Here is an example:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1"),
("user-2", "http://url/2"),
("user-1", "http://url/3"),
("user-3", "http://url/4"),
("user-1", "http://url/3")
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Xingbo Huang
Hi,

From my point of view, pyflink-shell only provides an interactive tool. Below it, you can choose whether to run the job in minicluster(similar to python xx.py)  or submit it to the cluster through flink run. For python xxx.py, it is reasonable to not load the config of flink-conf.yaml. What do you think?


Best,
Xingbo

Sharipov, Rinat <[hidden email]> 于2020年10月13日周二 下午2:16写道:
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view, that pyflink-shell.sh doesn't use provided flink-conf.yaml, don't you think that it looks like an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <[hidden email]>:
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

The flink-conf.yaml way will only take effect when submitted through flink run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat <[hidden email]> 于2020年10月13日周二 上午1:56写道:
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using python API.
Currently I faced an issue in both server env and my local IDE environment. 

When I'm trying to execute the example below I got an error message: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size

Of course I've added required property into flink-conf.yaml and checked that pyflink-shell.sh initializes env using specified configuration but it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

Here is an example:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1"),
("user-2", "http://url/2"),
("user-1", "http://url/3"),
("user-3", "http://url/4"),
("user-1", "http://url/3")
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Rinat
The main confusion for me was that pyflink-shell calls environment configuration script, so I was hoping that it should configure the stream/ table environment because it configures the path to flink-conf.yaml

Here is a code sample from pyflink-shell

Screenshot 2020-10-13 at 11.59.08.png



Best,
Rinat

вт, 13 окт. 2020 г. в 10:28, Xingbo Huang <[hidden email]>:
Hi,

From my point of view, pyflink-shell only provides an interactive tool. Below it, you can choose whether to run the job in minicluster(similar to python xx.py)  or submit it to the cluster through flink run. For python xxx.py, it is reasonable to not load the config of flink-conf.yaml. What do you think?


Best,
Xingbo

Sharipov, Rinat <[hidden email]> 于2020年10月13日周二 下午2:16写道:
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view, that pyflink-shell.sh doesn't use provided flink-conf.yaml, don't you think that it looks like an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <[hidden email]>:
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')

The flink-conf.yaml way will only take effect when submitted through flink run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat <[hidden email]> 于2020年10月13日周二 上午1:56写道:
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using python API.
Currently I faced an issue in both server env and my local IDE environment. 

When I'm trying to execute the example below I got an error message: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size

Of course I've added required property into flink-conf.yaml and checked that pyflink-shell.sh initializes env using specified configuration but it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

Here is an example:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1"),
("user-2", "http://url/2"),
("user-1", "http://url/3"),
("user-3", "http://url/4"),
("user-1", "http://url/3")
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()