Re: Extracting Timestamp in MapFunction
Posted by
Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7478.html
Hi,
could you try pulling the problem apart, i.e. determine at which point in the pipeline you have duplicate data. Is it after the sources or in the CoFlatMap or the Map after the reduce, for example?
Cheers,
Aljoscha
Hi,
Before giving the method u described above a try, i tried adding the
timestamp with my data directly at the stream source.
Following is my stream source:
http://pastebin.com/AsXiStMC
and I am using the stream source as follows:
DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
streamSpeed));
ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster =
tuples.iterate()
.withFeedbackType(MicroCluster[].class);
//mcStream.broadcast().global();
DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
.flatMap(new MyCoFlatmap(k,tw))
.keyBy(1)
.reduce(new ReduceMC(k))
.map(new ReturnMC());
inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());
The problem is, when i execute this, all the 4 different partition gets the
same data, I don't really understand how is the same data sent to all the 4
partitions when it should 4 different data tuple to 4 different partitions.
Can you maybe explain this behaviour?
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.