Reading Parameter values sent to partition

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view

Reading Parameter values sent to partition

Biplob Biswas

I am trying to send some static integer values down to each map function, using the following code

        public static void main(String[] args) throws Exception {
                ParameterTool params = ParameterTool.fromArgs(args);
                String filePath = params.getRequired("path");
                int k = params.getInt("k");
                Configuration mapConf = new Configuration();
                mapConf.setInteger("numberofMC", k);

                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
                tuples.flatMap(new MyFlatmap())

public static final class MyFlatmap extends RichFlatMapFunction<Point,
                int numofMC = 5;
                public MyCoFlatmap() {
                public void open(Configuration parameters) throws Exception {

                      numofMC = parameters.getInteger("numberofMC",-1);

                public void flatMap(Point in, Collector<Tuple2<MicroCluster[], Integer>> out) throws Exception {

but when i do the above things to get the value of numberofMC, i dont get it to the map funcitons and it returns me the default value of -1.

What could be the reason behind this?
Reply | Threaded
Open this post in threaded view

Re: Reading Parameter values sent to partition

Chesnay Schepler
There are 2 flaws in your code:

Let's start with the fundamental one:

At no point do you associate your mapConf with the flatMap or even the job.

THeoretically you should add it to the flatMap using
flatMap(...).withConfiguration(mapConf) method.

But here's is the second a more subtle flaw:

the withConfiguration() method does not work at all in the streaming API.

The solution to your problem is to pass your parameter k into the
constructor of MyFlatMap and store it in a field.


On 28.05.2016 14:11, Biplob Biswas wrote:

> Hi,
> I am trying to send some static integer values down to each map function,
> using the following code
> public static void main(String[] args) throws Exception {
> ParameterTool params = ParameterTool.fromArgs(args);
> String filePath = params.getRequired("path");
> int k = params.getInt("k");
> Configuration mapConf = new Configuration();
> mapConf.setInteger("numberofMC", k);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
> streamSpeed));
>                  tuples.flatMap(new MyFlatmap())
>           }
> -------------------------------------------------------------------------------------------------------
> public static final class MyFlatmap extends RichFlatMapFunction<Point,
> Tuple2&lt;MicroCluster[],Integer>>{
> int numofMC = 5;
> public MyCoFlatmap() {
> }
> public void open(Configuration parameters) throws Exception {
>      numofMC = parameters.getInteger("numberofMC",-1);
> System.out.println(numofMC);
>    }
> @Override
> public void flatMap(Point in, Collector<Tuple2&lt;MicroCluster[],
> Integer>> out) throws Exception {
>                 }
> but when i do the above things to get the value of numberofMC, i dont get it
> to the map funcitons and it returns me the default value of -1.
> What could be the reason behind this?
> --
> View this message in context:
> Sent from the Apache Flink User Mailing List archive. mailing list archive at

Reply | Threaded
Open this post in threaded view

Re: Reading Parameter values sent to partition

Biplob Biswas
Aah, thanks a lot for that insight. Pretty new to the Flink systems and learning on my own so prone to making mistakes.

Thanks a lot for helping.