Hi,
I Need some assistance - I’m trying to globally register arguments from my main function for further extraction on stream processing nodes. My code base is Scala: val env = StreamExecutionEnvironment.getExecutionEnvironment val parameterTool = ParameterTool.fromArgs(args)
but when I'm trying to retrieve them I get null pointer exception. private lazy val parameters: GlobalJobParameters = ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters Have read this article and I’m curious if it is possible. This is required in order to read from a configuration file holding DB Connection String (Bootstrapping DB connections on each processing node on the bootstrapping phase. Regards, Chen. |
Hello Chen,
you can access the set configuration in your rich function like this:
R egards,Chesnay On 17.07.2016 18:22, Chen Bekor wrote:
|
I would discourage from using the GlobalJobParameters. I think their
main purpose was to display configuration keys on the web interface. Instead, I would simply do it as a class field which you set in the constructor. public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private final String dbConnect; public Tokenizer(String dbConnect) { this.dbConnect = Preconditions.checkNotNull(dbConnect, "Input"); } <...> } This way you also check the arguments before submitting the job and not when the flat map is being executed. – Ufuk On Sun, Jul 17, 2016 at 7:16 PM, Chesnay Schepler <[hidden email]> wrote: > Hello Chen, > > you can access the set configuration in your rich function like this: > > public static final class Tokenizer extends RichFlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { > ParameterTool parameters = (ParameterTool) > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > parameters.getRequired("input"); > // .. do more ... > > Regards, > Chesnay > > > On 17.07.2016 18:22, Chen Bekor wrote: > > Hi, > > I Need some assistance - > > I’m trying to globally register arguments from my main function for further > extraction on stream processing nodes. My code base is Scala: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val parameterTool = ParameterTool.fromArgs(args) > > env.getConfig.setGlobalJobParameters(parameterTool) > > but when I'm trying to retrieve them I get null pointer exception. > > private lazy val parameters: GlobalJobParameters = > ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters > > Have read this article > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/best_practices.html > and I’m curious if it is possible. > > This is required in order to read from a configuration file holding DB > Connection String (Bootstrapping DB connections on each processing node on > the bootstrapping phase. > > Regards, > Chen. > > |
PS: Please provide a subject line in the future, it makes it easier
for the community to asses whether they can help or not without looking into the message. On Mon, Jul 18, 2016 at 11:14 AM, Ufuk Celebi <[hidden email]> wrote: > I would discourage from using the GlobalJobParameters. I think their > main purpose was to display configuration keys on the web interface. > > Instead, I would simply do it as a class field which you set in the constructor. > > public static final class Tokenizer extends > RichFlatMapFunction<String, Tuple2<String, Integer>> { > > private final String dbConnect; > > public Tokenizer(String dbConnect) { > this.dbConnect = Preconditions.checkNotNull(dbConnect, "Input"); > } > > <...> > } > > This way you also check the arguments before submitting the job and > not when the flat map is being executed. > > – Ufuk > > > On Sun, Jul 17, 2016 at 7:16 PM, Chesnay Schepler <[hidden email]> wrote: >> Hello Chen, >> >> you can access the set configuration in your rich function like this: >> >> public static final class Tokenizer extends RichFlatMapFunction<String, >> Tuple2<String, Integer>> { >> @Override >> public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { >> ParameterTool parameters = (ParameterTool) >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); >> parameters.getRequired("input"); >> // .. do more ... >> >> Regards, >> Chesnay >> >> >> On 17.07.2016 18:22, Chen Bekor wrote: >> >> Hi, >> >> I Need some assistance - >> >> I’m trying to globally register arguments from my main function for further >> extraction on stream processing nodes. My code base is Scala: >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> val parameterTool = ParameterTool.fromArgs(args) >> >> env.getConfig.setGlobalJobParameters(parameterTool) >> >> but when I'm trying to retrieve them I get null pointer exception. >> >> private lazy val parameters: GlobalJobParameters = >> ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters >> >> Have read this article >> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/best_practices.html >> and I’m curious if it is possible. >> >> This is required in order to read from a configuration file holding DB >> Connection String (Bootstrapping DB connections on each processing node on >> the bootstrapping phase. >> >> Regards, >> Chen. >> >> |
Free forum by Nabble | Edit this page |