Re: Extracting Timestamp in MapFunction
Posted by
Biplob Biswas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
Hi,
Before giving the method u described above a try, i tried adding the timestamp with my data directly at the stream source.
Following is my stream source:
http://pastebin.com/AsXiStMCand I am using the stream source as follows:
DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster = tuples.iterate()
.withFeedbackType(MicroCluster[].class);
//mcStream.broadcast().global();
DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
.flatMap(new MyCoFlatmap(k,tw))
.keyBy(1)
.reduce(new ReduceMC(k))
.map(new ReturnMC());
inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());
The problem is, when i execute this, all the 4 different partition gets the same data, I don't really understand how is the same data sent to all the 4 partitions when it should 4 different data tuple to 4 different partitions.
Can you maybe explain this behaviour?