Re: Extracting Timestamp in MapFunction

Posted by Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7478.html

Hi,
could you try pulling the problem apart, i.e. determine at which point in the pipeline you have duplicate data. Is it after the sources or in the CoFlatMap or the Map after the reduce, for example?

Cheers,
Aljoscha

On Wed, 1 Jun 2016 at 17:11 Biplob Biswas <[hidden email]> wrote:
Hi,

Before giving the method u described above a try, i tried adding the
timestamp with my data directly at the stream source.

Following is my stream source:

http://pastebin.com/AsXiStMC

and I am using the stream source as follows:

DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
streamSpeed));
                ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster =
tuples.iterate()
                                                                                                                .withFeedbackType(MicroCluster[].class);
                //mcStream.broadcast().global();
                DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
                                                                                                                .flatMap(new MyCoFlatmap(k,tw))
                                                                                                                .keyBy(1)
                                                                                                                .reduce(new ReduceMC(k))
                                                                                                                .map(new ReturnMC());

                inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());

The problem is, when i execute this, all the 4 different partition gets the
same data, I don't really understand how is the same data sent to all the 4
partitions when it should 4 different data tuple to 4 different partitions.

Can you maybe explain this behaviour?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.