Reading Parameter values sent to partition

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading Parameter values sent to partition

Biplob Biswas
Hi,

I am trying to send some static integer values down to each map function, using the following code

        public static void main(String[] args) throws Exception {
               
                ParameterTool params = ParameterTool.fromArgs(args);
               
                String filePath = params.getRequired("path");
                int k = params.getInt("k");
               
                Configuration mapConf = new Configuration();
                mapConf.setInteger("numberofMC", k);

               
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
               
                DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
                tuples.flatMap(new MyFlatmap())
         }

-------------------------------------------------------------------------------------------------------
public static final class MyFlatmap extends RichFlatMapFunction<Point,
                                                                                                                                                Tuple2<MicroCluster[],Integer>>{
                int numofMC = 5;
                public MyCoFlatmap() {
                }
               
                public void open(Configuration parameters) throws Exception {
                      super.open(parameters);

                      numofMC = parameters.getInteger("numberofMC",-1);
                                System.out.println(numofMC);

                    }
               
                @Override
                public void flatMap(Point in, Collector<Tuple2<MicroCluster[], Integer>> out) throws Exception {
               }


but when i do the above things to get the value of numberofMC, i dont get it to the map funcitons and it returns me the default value of -1.

What could be the reason behind this?
Reply | Threaded
Open this post in threaded view
|

Re: Reading Parameter values sent to partition

Chesnay Schepler
There are 2 flaws in your code:

Let's start with the fundamental one:

At no point do you associate your mapConf with the flatMap or even the job.

THeoretically you should add it to the flatMap using
flatMap(...).withConfiguration(mapConf) method.

But here's is the second a more subtle flaw:

the withConfiguration() method does not work at all in the streaming API.

The solution to your problem is to pass your parameter k into the
constructor of MyFlatMap and store it in a field.

Regards,
Chesnay

On 28.05.2016 14:11, Biplob Biswas wrote:

> Hi,
>
> I am trying to send some static integer values down to each map function,
> using the following code
>
> public static void main(String[] args) throws Exception {
>
> ParameterTool params = ParameterTool.fromArgs(args);
>
> String filePath = params.getRequired("path");
> int k = params.getInt("k");
>
> Configuration mapConf = new Configuration();
> mapConf.setInteger("numberofMC", k);
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
> streamSpeed));
>                  tuples.flatMap(new MyFlatmap())
>           }
>
> -------------------------------------------------------------------------------------------------------
> public static final class MyFlatmap extends RichFlatMapFunction<Point,
> Tuple2&lt;MicroCluster[],Integer>>{
> int numofMC = 5;
> public MyCoFlatmap() {
> }
>
> public void open(Configuration parameters) throws Exception {
>      super.open(parameters);
>
>      numofMC = parameters.getInteger("numberofMC",-1);
> System.out.println(numofMC);
>
>    }
>
> @Override
> public void flatMap(Point in, Collector<Tuple2&lt;MicroCluster[],
> Integer>> out) throws Exception {
>                 }
>
>
> but when i do the above things to get the value of numberofMC, i dont get it
> to the map funcitons and it returns me the default value of -1.
>
> What could be the reason behind this?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-Parameter-values-sent-to-partition-tp7228.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Reading Parameter values sent to partition

Biplob Biswas
Aah, thanks a lot for that insight. Pretty new to the Flink systems and learning on my own so prone to making mistakes.

Thanks a lot for helping.