Hi,
I have a source where I am using the collectWithTimestamp method and streaming the timestamp along with the actual data. Now I want to get the values of the timestamp in the map function, I tried looking for that in the documentation in the following link and the ExtractTimestamp method interested me, although I didnt understand the working as to how can i use that for my purpose. https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/index.html Also the hyperlink "Working with time" is broken, so couldn't read anything about timestamps from there as well. Any help is much appreciated. Thanks a lot. Biplob Biswas |
Hi,
right now the only way of getting at the timestamps is writing a custom operator and using that with DataStream.transform(). Take a look at StreamMap, which is the operator implementation that executes MapFunctions. I think the links in the doc should point to https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html and https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_timestamps_watermarks.html. I'll fix the documentation link. Cheers, Aljoscha On Mon, 30 May 2016 at 00:47 Biplob Biswas <[hidden email]> wrote: Hi, |
Hi,
Before giving the method u described above a try, i tried adding the timestamp with my data directly at the stream source. Following is my stream source: http://pastebin.com/AsXiStMC and I am using the stream source as follows: DataStream<Point> tuples = env.addSource(new DataStreamGenerator(filePath, streamSpeed)); ConnectedIterativeStreams<Point, MicroCluster[]> inputsAndMicroCluster = tuples.iterate() .withFeedbackType(MicroCluster[].class); //mcStream.broadcast().global(); DataStream<MicroCluster[]> updatedMicroCluster = inputsAndMicroCluster .flatMap(new MyCoFlatmap(k,tw)) .keyBy(1) .reduce(new ReduceMC(k)) .map(new ReturnMC()); inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast()); The problem is, when i execute this, all the 4 different partition gets the same data, I don't really understand how is the same data sent to all the 4 partitions when it should 4 different data tuple to 4 different partitions. Can you maybe explain this behaviour? |
Hi, could you try pulling the problem apart, i.e. determine at which point in the pipeline you have duplicate data. Is it after the sources or in the CoFlatMap or the Map after the reduce, for example? Cheers, Aljoscha On Wed, 1 Jun 2016 at 17:11 Biplob Biswas <[hidden email]> wrote: Hi, |
Hi Aljoscha,
I went to the Flink hackathon by Buzzwords yesterday where Fabian and Robert helped me with this issue. Apparently I was assuming that the file would be handled in a single thread but I was using parallelsourcefunction and it was creating 4 different threads and thus reading the same values 4 times. I changed it to source function and then changed the parallelism of the map operator to do what I wanted to do. Thanks a lot for replying. :) Regards Biplob |
Free forum by Nabble | Edit this page |