passing additional jvm parameters to the configuration

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

passing additional jvm parameters to the configuration

Georg Heiler
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Arvid Heise-3
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Georg Heiler
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Arvid Heise-3
Hi Georg,

thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you.


On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler <[hidden email]> wrote:
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Georg Heiler
Hi,

but how can I change/configure it per submitted job and not for the whole cluster?

Best,
Georg

Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you.


On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler <[hidden email]> wrote:
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Arvid Heise-3
Hi Georg,

I think there is a conceptual misunderstanding. If you reuse the cluster for several jobs, they need to share the JVM_ARGS since it's the same process. [1] On Spark, new processes are spawned for each stage afaik.

However, the current recommendation is to use only one ad-hoc cluster per job/application (which is closer to how Spark works). So if you use YARN, every job/application spawns a new cluster that just has the right size for it. Then you can supply new parameters for new YARN submission with
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

However, make sure that the path is accessible from within your YARN cluster, since the driver is probably executed on the cluster (not 100% sure).


If you want per job level configurations on a shared cluster, I'd recommend to use normal parameters and initialize PureConfig manually (haven't used it, so not sure how). Then, you'd probably invoke your program as follows.
flink run \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'


For local execution, I had some trouble configuring it as well (tried it with your code). The issue is that all parameters that we previously tried are only passed to newly spawned processes while your code is directly executed in the CLI.

FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

FLINK_ENV_JAVA_OPTS is usually parsed from flink-conf.yaml using the env.java.opts but doesn't respect -Denv.java.opts. I'm not sure if this is intentional.


If you could put the env.java.opts in the flink-conf.yaml, it would most likely work for both YARN and local. With FLINK_CONF_DIR you can set a different conf dir per job. Alternatively, you could also specify both FLINK_ENV_JAVA_OPTS and -yD to inject the property.



On Thu, Jun 25, 2020 at 12:49 PM Georg Heiler <[hidden email]> wrote:
Hi,

but how can I change/configure it per submitted job and not for the whole cluster?

Best,
Georg

Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you.


On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler <[hidden email]> wrote:
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Georg Heiler
Thanks a lot!
Your point is right.

One Cluster per job should be used in the thought model to be comparable.

In particular for YARN:
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

You mentioned, that the path must be accessible. Spark has a --files parameter and then the local file is automatically copied to the root of the YARN container. Is something similar available in Flink?

Best,
Georg

Am Do., 25. Juni 2020 um 14:58 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

I think there is a conceptual misunderstanding. If you reuse the cluster for several jobs, they need to share the JVM_ARGS since it's the same process. [1] On Spark, new processes are spawned for each stage afaik.

However, the current recommendation is to use only one ad-hoc cluster per job/application (which is closer to how Spark works). So if you use YARN, every job/application spawns a new cluster that just has the right size for it. Then you can supply new parameters for new YARN submission with
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

However, make sure that the path is accessible from within your YARN cluster, since the driver is probably executed on the cluster (not 100% sure).


If you want per job level configurations on a shared cluster, I'd recommend to use normal parameters and initialize PureConfig manually (haven't used it, so not sure how). Then, you'd probably invoke your program as follows.
flink run \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'


For local execution, I had some trouble configuring it as well (tried it with your code). The issue is that all parameters that we previously tried are only passed to newly spawned processes while your code is directly executed in the CLI.

FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

FLINK_ENV_JAVA_OPTS is usually parsed from flink-conf.yaml using the env.java.opts but doesn't respect -Denv.java.opts. I'm not sure if this is intentional.


If you could put the env.java.opts in the flink-conf.yaml, it would most likely work for both YARN and local. With FLINK_CONF_DIR you can set a different conf dir per job. Alternatively, you could also specify both FLINK_ENV_JAVA_OPTS and -yD to inject the property.



On Thu, Jun 25, 2020 at 12:49 PM Georg Heiler <[hidden email]> wrote:
Hi,

but how can I change/configure it per submitted job and not for the whole cluster?

Best,
Georg

Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you.


On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler <[hidden email]> wrote:
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Arvid Heise-3
You are welcome.

I'm not an expert on the yarn executor but I hope that

     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)

can help [1]. Oddly this option is not given on the YARN page. But it should be available as it's also used in the SSL setup [2].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/security-ssl.html#tips-for-yarn--mesos-deployment

On Thu, Jun 25, 2020 at 3:23 PM Georg Heiler <[hidden email]> wrote:
Thanks a lot!
Your point is right.

One Cluster per job should be used in the thought model to be comparable.

In particular for YARN:
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

You mentioned, that the path must be accessible. Spark has a --files parameter and then the local file is automatically copied to the root of the YARN container. Is something similar available in Flink?

Best,
Georg

Am Do., 25. Juni 2020 um 14:58 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

I think there is a conceptual misunderstanding. If you reuse the cluster for several jobs, they need to share the JVM_ARGS since it's the same process. [1] On Spark, new processes are spawned for each stage afaik.

However, the current recommendation is to use only one ad-hoc cluster per job/application (which is closer to how Spark works). So if you use YARN, every job/application spawns a new cluster that just has the right size for it. Then you can supply new parameters for new YARN submission with
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

However, make sure that the path is accessible from within your YARN cluster, since the driver is probably executed on the cluster (not 100% sure).


If you want per job level configurations on a shared cluster, I'd recommend to use normal parameters and initialize PureConfig manually (haven't used it, so not sure how). Then, you'd probably invoke your program as follows.
flink run \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'


For local execution, I had some trouble configuring it as well (tried it with your code). The issue is that all parameters that we previously tried are only passed to newly spawned processes while your code is directly executed in the CLI.

FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

FLINK_ENV_JAVA_OPTS is usually parsed from flink-conf.yaml using the env.java.opts but doesn't respect -Denv.java.opts. I'm not sure if this is intentional.


If you could put the env.java.opts in the flink-conf.yaml, it would most likely work for both YARN and local. With FLINK_CONF_DIR you can set a different conf dir per job. Alternatively, you could also specify both FLINK_ENV_JAVA_OPTS and -yD to inject the property.



On Thu, Jun 25, 2020 at 12:49 PM Georg Heiler <[hidden email]> wrote:
Hi,

but how can I change/configure it per submitted job and not for the whole cluster?

Best,
Georg

Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you.


On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler <[hidden email]> wrote:
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: passing additional jvm parameters to the configuration

Georg Heiler

Your suggestion seems to work well.

Best,
Georg

Am Do., 25. Juni 2020 um 15:32 Uhr schrieb Arvid Heise <[hidden email]>:
You are welcome.

I'm not an expert on the yarn executor but I hope that

     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)

can help [1]. Oddly this option is not given on the YARN page. But it should be available as it's also used in the SSL setup [2].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/security-ssl.html#tips-for-yarn--mesos-deployment

On Thu, Jun 25, 2020 at 3:23 PM Georg Heiler <[hidden email]> wrote:
Thanks a lot!
Your point is right.

One Cluster per job should be used in the thought model to be comparable.

In particular for YARN:
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

You mentioned, that the path must be accessible. Spark has a --files parameter and then the local file is automatically copied to the root of the YARN container. Is something similar available in Flink?

Best,
Georg

Am Do., 25. Juni 2020 um 14:58 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

I think there is a conceptual misunderstanding. If you reuse the cluster for several jobs, they need to share the JVM_ARGS since it's the same process. [1] On Spark, new processes are spawned for each stage afaik.

However, the current recommendation is to use only one ad-hoc cluster per job/application (which is closer to how Spark works). So if you use YARN, every job/application spawns a new cluster that just has the right size for it. Then you can supply new parameters for new YARN submission with
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

However, make sure that the path is accessible from within your YARN cluster, since the driver is probably executed on the cluster (not 100% sure).


If you want per job level configurations on a shared cluster, I'd recommend to use normal parameters and initialize PureConfig manually (haven't used it, so not sure how). Then, you'd probably invoke your program as follows.
flink run \
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'


For local execution, I had some trouble configuring it as well (tried it with your code). The issue is that all parameters that we previously tried are only passed to newly spawned processes while your code is directly executed in the CLI.

FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run
-class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"

FLINK_ENV_JAVA_OPTS is usually parsed from flink-conf.yaml using the env.java.opts but doesn't respect -Denv.java.opts. I'm not sure if this is intentional.


If you could put the env.java.opts in the flink-conf.yaml, it would most likely work for both YARN and local. With FLINK_CONF_DIR you can set a different conf dir per job. Alternatively, you could also specify both FLINK_ENV_JAVA_OPTS and -yD to inject the property.



On Thu, Jun 25, 2020 at 12:49 PM Georg Heiler <[hidden email]> wrote:
Hi,

but how can I change/configure it per submitted job and not for the whole cluster?

Best,
Georg

Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

thank you for your detailed explanation. You want to use env.java.opts[1]. There are flavors if you only want to make it available on job manager or task manager but I guess the basic form is good enough for you.


On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler <[hidden email]> wrote:
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There, when executing on YARN or locally usually, the cluster is created on-demand for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master) B) slave/executor C) Driver: the node where the main class is invoked. In Sparks`notion, I want the -D parameter to be available on the (C) Driver node. When translating this to Flink, I want this to be available to the Main class which is invoked when the job is submitted/started by the job manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <[hidden email]>:
Hi Georg,

could you check if simply using -D is working as described here [1].

If not, could you please be more precise: do you want the parameter to be passed to the driver, the job manager, or the task managers?


On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler <[hidden email]> wrote:
Hi,

how can I pass additional configuration parameters like spark`s extraJavaOptions to a flink job?


contains the details. But the gist is:
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng