Extracting Timestamp in MapFunction

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Extracting Timestamp in MapFunction

Biplob Biswas
Hi,

I have a source where I am using the collectWithTimestamp method and streaming the timestamp along with the actual data.

Now I want to get the values of the timestamp in the map function, I tried looking for that in the documentation in the following link and the ExtractTimestamp method interested me, although I didnt understand the working as to how can i use that for my purpose.

https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html

Also the hyperlink "Working with time" is broken, so couldn't read anything about timestamps from there as well.

Any help is much appreciated.

Thanks a lot.
Biplob Biswas

Reply | Threaded
Open this post in threaded view
|

Re: Extracting Timestamp in MapFunction

Aljoscha Krettek
Hi,
right now the only way of getting at the timestamps is writing a custom operator and using that with DataStream.transform(). Take a look at StreamMap, which is the operator implementation that executes MapFunctions.


Cheers,
Aljoscha

On Mon, 30 May 2016 at 00:47 Biplob Biswas <[hidden email]> wrote:
Hi,

I have a source where I am using the collectWithTimestamp method and
streaming the timestamp along with the actual data.

Now I want to get the values of the timestamp in the map function, I tried
looking for that in the documentation in the following link and the
ExtractTimestamp method interested me, although I didnt understand the
working as to how can i use that for my purpose.

https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html

Also the hyperlink "Working with time" is broken, so couldn't read anything
about timestamps from there as well.

Any help is much appreciated.

Thanks a lot.
Biplob Biswas





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Extracting Timestamp in MapFunction

Biplob Biswas
Hi,

Before giving the method u described above a try, i tried adding the timestamp with my data directly at the stream source.

Following is my stream source:

http://pastebin.com/AsXiStMC

and I am using the stream source as follows:

DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed));
                ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster = tuples.iterate()
                                                                                                                .withFeedbackType(MicroCluster[].class);
                //mcStream.broadcast().global();
                DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
                                                                                                                .flatMap(new MyCoFlatmap(k,tw))
                                                                                                                .keyBy(1)
                                                                                                                .reduce(new ReduceMC(k))
                                                                                                                .map(new ReturnMC());
               
                inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());

The problem is, when i execute this, all the 4 different partition gets the same data, I don't really understand how is the same data sent to all the 4 partitions when it should 4 different data tuple to 4 different partitions.

Can you maybe explain this behaviour?
Reply | Threaded
Open this post in threaded view
|

Re: Extracting Timestamp in MapFunction

Aljoscha Krettek
Hi,
could you try pulling the problem apart, i.e. determine at which point in the pipeline you have duplicate data. Is it after the sources or in the CoFlatMap or the Map after the reduce, for example?

Cheers,
Aljoscha

On Wed, 1 Jun 2016 at 17:11 Biplob Biswas <[hidden email]> wrote:
Hi,

Before giving the method u described above a try, i tried adding the
timestamp with my data directly at the stream source.

Following is my stream source:

http://pastebin.com/AsXiStMC

and I am using the stream source as follows:

DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath,
streamSpeed));
                ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster =
tuples.iterate()
                                                                                                                .withFeedbackType(MicroCluster[].class);
                //mcStream.broadcast().global();
                DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster
                                                                                                                .flatMap(new MyCoFlatmap(k,tw))
                                                                                                                .keyBy(1)
                                                                                                                .reduce(new ReduceMC(k))
                                                                                                                .map(new ReturnMC());

                inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());

The problem is, when i execute this, all the 4 different partition gets the
same data, I don't really understand how is the same data sent to all the 4
partitions when it should 4 different data tuple to 4 different partitions.

Can you maybe explain this behaviour?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Extracting-Timestamp-in-MapFunction-tp7240p7315.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Extracting Timestamp in MapFunction

Biplob Biswas
Hi Aljoscha,

I went to the Flink hackathon by Buzzwords yesterday where Fabian and Robert helped me with this issue. Apparently I was assuming that the file would be handled in a single thread but I was using parallelsourcefunction and it was creating 4 different threads and thus reading the same values 4 times.
I changed it to source function and then changed the parallelism of the map operator to do what I wanted to do.

Thanks a lot for replying. :)

Regards
Biplob