(no subject)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

(no subject)

Chen Bekor
Hi,

I Need some assistance -

I’m trying to globally register arguments from my main function for further extraction on stream processing nodes. My code base is Scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val parameterTool = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(parameterTool)
but when I'm trying to retrieve them I get null pointer exception.
private lazy val parameters: GlobalJobParameters = ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters
Have read this article
and I’m curious if it is possible.

This is required in order to read from a configuration file holding DB Connection String (Bootstrapping DB connections on each processing node on the bootstrapping phase.

Regards,
Chen.

Reply | Threaded
Open this post in threaded view
|

Re:

Chesnay Schepler
Hello Chen,

you can access the set configuration in your rich function like this:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
	@Override
	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
		parameters.getRequired("input");
		// .. do more ...
Regards,
Chesnay

On 17.07.2016 18:22, Chen Bekor wrote:
Hi,

I Need some assistance -

I’m trying to globally register arguments from my main function for further extraction on stream processing nodes. My code base is Scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val parameterTool = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(parameterTool)
but when I'm trying to retrieve them I get null pointer exception.
private lazy val parameters: GlobalJobParameters = ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters
Have read this article
and I’m curious if it is possible.

This is required in order to read from a configuration file holding DB Connection String (Bootstrapping DB connections on each processing node on the bootstrapping phase.

Regards,
Chen.


Reply | Threaded
Open this post in threaded view
|

Re:

Ufuk Celebi
I would discourage from using the GlobalJobParameters. I think their
main purpose was to display configuration keys on the web interface.

Instead, I would simply do it as a class field which you set in the constructor.

public static final class Tokenizer extends
RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private final String dbConnect;

    public Tokenizer(String dbConnect) {
        this.dbConnect = Preconditions.checkNotNull(dbConnect, "Input");
    }

    <...>
}

This way you also check the arguments before submitting the job and
not when the flat map is being executed.

– Ufuk


On Sun, Jul 17, 2016 at 7:16 PM, Chesnay Schepler <[hidden email]> wrote:

> Hello Chen,
>
> you can access the set configuration in your rich function like this:
>
> public static final class Tokenizer extends RichFlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
> ParameterTool parameters = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> parameters.getRequired("input");
> // .. do more ...
>
> Regards,
> Chesnay
>
>
> On 17.07.2016 18:22, Chen Bekor wrote:
>
> Hi,
>
> I Need some assistance -
>
> I’m trying to globally register arguments from my main function for further
> extraction on stream processing nodes. My code base is Scala:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val parameterTool = ParameterTool.fromArgs(args)
>
> env.getConfig.setGlobalJobParameters(parameterTool)
>
> but when I'm trying to retrieve them I get null pointer exception.
>
> private lazy val parameters: GlobalJobParameters =
> ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters
>
> Have read this article
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/best_practices.html
> and I’m curious if it is possible.
>
> This is required in order to read from a configuration file holding DB
> Connection String (Bootstrapping DB connections on each processing node on
> the bootstrapping phase.
>
> Regards,
> Chen.
>
>
Reply | Threaded
Open this post in threaded view
|

Re:

Ufuk Celebi
PS: Please provide a subject line in the future, it makes it easier
for the community to asses whether they can help or not without
looking into the message.

On Mon, Jul 18, 2016 at 11:14 AM, Ufuk Celebi <[hidden email]> wrote:

> I would discourage from using the GlobalJobParameters. I think their
> main purpose was to display configuration keys on the web interface.
>
> Instead, I would simply do it as a class field which you set in the constructor.
>
> public static final class Tokenizer extends
> RichFlatMapFunction<String, Tuple2<String, Integer>> {
>
>     private final String dbConnect;
>
>     public Tokenizer(String dbConnect) {
>         this.dbConnect = Preconditions.checkNotNull(dbConnect, "Input");
>     }
>
>     <...>
> }
>
> This way you also check the arguments before submitting the job and
> not when the flat map is being executed.
>
> – Ufuk
>
>
> On Sun, Jul 17, 2016 at 7:16 PM, Chesnay Schepler <[hidden email]> wrote:
>> Hello Chen,
>>
>> you can access the set configuration in your rich function like this:
>>
>> public static final class Tokenizer extends RichFlatMapFunction<String,
>> Tuple2<String, Integer>> {
>>       @Override
>>       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
>>               ParameterTool parameters = (ParameterTool)
>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>               parameters.getRequired("input");
>>               // .. do more ...
>>
>> Regards,
>> Chesnay
>>
>>
>> On 17.07.2016 18:22, Chen Bekor wrote:
>>
>> Hi,
>>
>> I Need some assistance -
>>
>> I’m trying to globally register arguments from my main function for further
>> extraction on stream processing nodes. My code base is Scala:
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> val parameterTool = ParameterTool.fromArgs(args)
>>
>> env.getConfig.setGlobalJobParameters(parameterTool)
>>
>> but when I'm trying to retrieve them I get null pointer exception.
>>
>> private lazy val parameters: GlobalJobParameters =
>> ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters
>>
>> Have read this article
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/best_practices.html
>> and I’m curious if it is possible.
>>
>> This is required in order to read from a configuration file holding DB
>> Connection String (Bootstrapping DB connections on each processing node on
>> the bootstrapping phase.
>>
>> Regards,
>> Chen.
>>
>>