Hi,
I am trying to send some static integer values down to each map function, using the following code public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); String filePath = params.getRequired("path"); int k = params.getInt("k"); Configuration mapConf = new Configuration(); mapConf.setInteger("numberofMC", k); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed)); tuples.flatMap(new MyFlatmap()) } ------------------------------------------------------------------------------------------------------- public static final class MyFlatmap extends RichFlatMapFunction<Point, Tuple2<MicroCluster[],Integer>>{ int numofMC = 5; public MyCoFlatmap() { } public void open(Configuration parameters) throws Exception { super.open(parameters); numofMC = parameters.getInteger("numberofMC",-1); System.out.println(numofMC); } @Override public void flatMap(Point in, Collector<Tuple2<MicroCluster[], Integer>> out) throws Exception { } but when i do the above things to get the value of numberofMC, i dont get it to the map funcitons and it returns me the default value of -1. What could be the reason behind this? |
There are 2 flaws in your code:
Let's start with the fundamental one: At no point do you associate your mapConf with the flatMap or even the job. THeoretically you should add it to the flatMap using flatMap(...).withConfiguration(mapConf) method. But here's is the second a more subtle flaw: the withConfiguration() method does not work at all in the streaming API. The solution to your problem is to pass your parameter k into the constructor of MyFlatMap and store it in a field. Regards, Chesnay On 28.05.2016 14:11, Biplob Biswas wrote: > Hi, > > I am trying to send some static integer values down to each map function, > using the following code > > public static void main(String[] args) throws Exception { > > ParameterTool params = ParameterTool.fromArgs(args); > > String filePath = params.getRequired("path"); > int k = params.getInt("k"); > > Configuration mapConf = new Configuration(); > mapConf.setInteger("numberofMC", k); > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, > streamSpeed)); > tuples.flatMap(new MyFlatmap()) > } > > ------------------------------------------------------------------------------------------------------- > public static final class MyFlatmap extends RichFlatMapFunction<Point, > Tuple2<MicroCluster[],Integer>>{ > int numofMC = 5; > public MyCoFlatmap() { > } > > public void open(Configuration parameters) throws Exception { > super.open(parameters); > > numofMC = parameters.getInteger("numberofMC",-1); > System.out.println(numofMC); > > } > > @Override > public void flatMap(Point in, Collector<Tuple2<MicroCluster[], > Integer>> out) throws Exception { > } > > > but when i do the above things to get the value of numberofMC, i dont get it > to the map funcitons and it returns me the default value of -1. > > What could be the reason behind this? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-Parameter-values-sent-to-partition-tp7228.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > |
Aah, thanks a lot for that insight. Pretty new to the Flink systems and learning on my own so prone to making mistakes.
Thanks a lot for helping. |
Free forum by Nabble | Edit this page |