Re: Extracting Timestamp in MapFunction

Posted by Biplob Biswas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html

Hi,

Before giving the method u described above a try, i tried adding the timestamp with my data directly at the stream source.

Following is my stream source:

http://pastebin.com/AsXiStMC

and I am using the stream source as follows:

DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
                ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster = tuples.iterate()
                                                                                                                .withFeedbackType(MicroCluster[].class);
                //mcStream.broadcast().global();
                DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
                                                                                                                .flatMap(new MyCoFlatmap(k,tw))
                                                                                                                .keyBy(1)
                                                                                                                .reduce(new ReduceMC(k))
                                                                                                                .map(new ReturnMC());
               
                inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());

The problem is, when i execute this, all the 4 different partition gets the same data, I don't really understand how is the same data sent to all the 4 partitions when it should 4 different data tuple to 4 different partitions.

Can you maybe explain this behaviour?