Customization of execution environment

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Customization of execution environment

Flavio Pompermaier
Hi to all,
migrating to Flink 1.11 I've tried to customize the exec env in this way:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bte = BatchTableEnvironment.create(env);
final Configuration conf = bte.getConfig().getConfiguration();
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));

This seems to not have any influence on the setting in my local env and I need to create env as a new LocalEnvironment if I want to customize it during tests:

final Configuration conf = env.getConfiguration();
conf.setLong(...)
env = new LocalEnvironment(conf);

Is this the desired behaviour or is it a bug?
Wouldn't it be possible to allow customization of env config it's actually editable?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Customization of execution environment

rmetzger0
Hi Flavio,

I think the recommended approach is as follows: (then you don't need to create to environments)

final Configuration conf = new Configuration();
conf.setLong(...)
env = new LocalEnvironment(conf);

I agree that in theory it would be nicer if the configuration returned was editable, but the handling of configs in Flink is pretty involved already.


On Tue, Jul 28, 2020 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
migrating to Flink 1.11 I've tried to customize the exec env in this way:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bte = BatchTableEnvironment.create(env);
final Configuration conf = bte.getConfig().getConfiguration();
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));

This seems to not have any influence on the setting in my local env and I need to create env as a new LocalEnvironment if I want to customize it during tests:

final Configuration conf = env.getConfiguration();
conf.setLong(...)
env = new LocalEnvironment(conf);

Is this the desired behaviour or is it a bug?
Wouldn't it be possible to allow customization of env config it's actually editable?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Customization of execution environment

Arvid Heise-3
I'm not entirely sure, if I completely understand the interaction of BTE and ExecEnv, but I'd create it this way
Configuration conf = new Configuration();
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);

ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
BatchTableEnvironment bte = BatchTableEnvironment.create(env);

On Wed, Jul 29, 2020 at 8:14 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,

I think the recommended approach is as follows: (then you don't need to create to environments)

final Configuration conf = new Configuration();
conf.setLong(...)
env = new LocalEnvironment(conf);

I agree that in theory it would be nicer if the configuration returned was editable, but the handling of configs in Flink is pretty involved already.


On Tue, Jul 28, 2020 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
migrating to Flink 1.11 I've tried to customize the exec env in this way:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bte = BatchTableEnvironment.create(env);
final Configuration conf = bte.getConfig().getConfiguration();
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));

This seems to not have any influence on the setting in my local env and I need to create env as a new LocalEnvironment if I want to customize it during tests:

final Configuration conf = env.getConfiguration();
conf.setLong(...)
env = new LocalEnvironment(conf);

Is this the desired behaviour or is it a bug?
Wouldn't it be possible to allow customization of env config it's actually editable?

Best,
Flavio


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Customization of execution environment

Flavio Pompermaier
That's fine and it's basically what I do as well..I was arguing that it's bad (IMHO) that you could access the config from the BatchTableEnvironment (via bte.getConfig().getConfiguration()).
You legitimately think that you are customizing the env but that's illusory. You should not be able to set properties if they are read only.

Cheers,
Flavio


On Thu, Jul 30, 2020 at 12:15 PM Arvid Heise <[hidden email]> wrote:
I'm not entirely sure, if I completely understand the interaction of BTE and ExecEnv, but I'd create it this way
Configuration conf = new Configuration();
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);

ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
BatchTableEnvironment bte = BatchTableEnvironment.create(env);

On Wed, Jul 29, 2020 at 8:14 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,

I think the recommended approach is as follows: (then you don't need to create to environments)

final Configuration conf = new Configuration();
conf.setLong(...)
env = new LocalEnvironment(conf);

I agree that in theory it would be nicer if the configuration returned was editable, but the handling of configs in Flink is pretty involved already.


On Tue, Jul 28, 2020 at 10:13 AM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
migrating to Flink 1.11 I've tried to customize the exec env in this way:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bte = BatchTableEnvironment.create(env);
final Configuration conf = bte.getConfig().getConfiguration();
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));

This seems to not have any influence on the setting in my local env and I need to create env as a new LocalEnvironment if I want to customize it during tests:

final Configuration conf = env.getConfiguration();
conf.setLong(...)
env = new LocalEnvironment(conf);

Is this the desired behaviour or is it a bug?
Wouldn't it be possible to allow customization of env config it's actually editable?

Best,
Flavio


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: Customization of execution environment

Aljoscha Krettek
I agree! My long-term goal is that a Configuration is the basis of truth
and that the programmatic setter methods and everything else just modify
the underlying configuration.

We have made big steps in at least allowing to configure most (if not
all) StreamExecutionEnvironment and TableEnvironment settings via a
Configuration but we're not completely there yet.

To me it's not yet clear whether modifications on the Configuration of
the TableEnvironment should go back to the Configuration of the
StreamExecutionEnvironment. It might be that some users find it
surprising that changes propagate.

Best,
Aljoscha



On 30.07.20 15:41, Flavio Pompermaier wrote:

> That's fine and it's basically what I do as well..I was arguing that it's
> bad (IMHO) that you could access the config from the BatchTableEnvironment
> (via bte.getConfig().getConfiguration()).
> You legitimately think that you are customizing the env but that's
> illusory. You should not be able to set properties if they are read only.
>
> Cheers,
> Flavio
>
>
> On Thu, Jul 30, 2020 at 12:15 PM Arvid Heise <[hidden email]> wrote:
>
>> I'm not entirely sure, if I completely understand the interaction of BTE
>> and ExecEnv, but I'd create it this way
>>
>> Configuration conf = new Configuration();
>> conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
>>
>> ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
>> BatchTableEnvironment bte = BatchTableEnvironment.create(env);
>>
>>
>> On Wed, Jul 29, 2020 at 8:14 AM Robert Metzger <[hidden email]>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I think the recommended approach is as follows: (then you don't need to
>>> create to environments)
>>>
>>> final Configuration conf = new Configuration();
>>> conf.setLong(...)
>>> env = new LocalEnvironment(conf);
>>>
>>> I agree that in theory it would be nicer if the configuration returned
>>> was editable, but the handling of configs in Flink is pretty involved
>>> already.
>>>
>>>
>>> On Tue, Jul 28, 2020 at 10:13 AM Flavio Pompermaier <[hidden email]>
>>> wrote:
>>>
>>>> Hi to all,
>>>> migrating to Flink 1.11 I've tried to customize the exec env in this way:
>>>>
>>>> ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>> BatchTableEnvironment bte = BatchTableEnvironment.create(env);
>>>> final Configuration conf = bte.getConfig().getConfiguration();
>>>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
>>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
>>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
>>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
>>>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));
>>>>
>>>> This seems to not have any influence on the setting in my local env and
>>>> I need to create env as a new LocalEnvironment if I want to customize it
>>>> during tests:
>>>>
>>>> final Configuration conf = env.getConfiguration();
>>>> conf.setLong(...)
>>>> env = new LocalEnvironment(conf);
>>>>
>>>> Is this the desired behaviour or is it a bug?
>>>> Wouldn't it be possible to allow customization of env config it's
>>>> actually editable?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>