Hi,
I am reading data points from a file and then i have to perform iterations over it. When I just check the data points before the iteration as follows, tuples.flatMap(new CheckData()) and print count inside CheckData() then I get 2500 data points each over 4 partitions, i.e. 10000 datapoints overall. Now when I do the following : ConnectedIterativeStreams<Point, Centroid[]> inputsAndMicroCluster = tuples.iterate() .withFeedbackType(Centroid[].class); DataStream<Centroid[]> updatedMicroCluster = inputsAndMicroCluster .flatMap(new MyCoFlatmap()); inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast()); Then, inside the MyCoFlatmap() method in map1, I check the count and its less than 1/5th of the total number of datapoints. I don't really know what is happening here exactly, why would the number of data points reduce like that suddenly? Thanks and Regards Biplob Biswas |
Can anyone check this once, and help me out with this?
I would be really obliged. |
Can you share the complete program with me? Than I would look into it.
Could be that you count in a wrong way. The iteration should definitely consume all the initial input once at least. On Mon, Jul 4, 2016 at 12:07 PM, Biplob Biswas <[hidden email]> wrote: > Can anyone check this once, and help me out with this? > > I would be really obliged. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7795.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
I have sent you my code in a separate email, I hope you can solve my issue.
Thanks a lot Biplob |
Sorry Biplob, I didn't have time to look into your code today. I will
try to do it tomorrow though. On Mon, Jul 4, 2016 at 2:53 PM, Biplob Biswas <[hidden email]> wrote: > I have sent you my code in a separate email, I hope you can solve my issue. > > Thanks a lot > Biplob > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7798.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Thanks a lot, would really appreciate it.
Also. please let me know if you don't understand it well, the documentation is not really great at the moment in the code. |
I couldn't tell anything from the code. I would suggest to reduce it
to a minimal example with Integers where you do the same thing flow structure wise (with simple types) and let's check that again. On Wed, Jul 6, 2016 at 9:35 AM, Biplob Biswas <[hidden email]> wrote: > Thanks a lot, would really appreciate it. > > Also. please let me know if you don't understand it well, the documentation > is not really great at the moment in the code. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7838.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Any update?
On Wed, Jul 6, 2016 at 5:55 PM, Ufuk Celebi <[hidden email]> wrote: > I couldn't tell anything from the code. I would suggest to reduce it > to a minimal example with Integers where you do the same thing flow > structure wise (with simple types) and let's check that again. > > On Wed, Jul 6, 2016 at 9:35 AM, Biplob Biswas <[hidden email]> wrote: >> Thanks a lot, would really appreciate it. >> >> Also. please let me know if you don't understand it well, the documentation >> is not really great at the moment in the code. >> >> >> >> -- >> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7838.html >> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
This post was updated on .
Hi,
Sorry for the late reply, was trying different stuff on my code. And from what I observed, its very weird for me. So after experimentation, I found out that when I increase the number of centroids, the number of data points forwarded decreases, when I lower the umber of centroids, the datapoint number increases. In my code, number of centroids is given by num_of_mc. I have reduced my code to the essential part where the problem is happening. Please find the code in the link below. To be precise, in the coflatmap map1 function I am supposed to get all the points, but I am not getting all the points which should be coming. I know that I wouldn't get the output of first n points, which as you can see from my code is just filling up the array of centroids, so I am not collecting them. But apart from that I should get every other point as I am collecting all the other point in all other cases (I have removed 2 of the cases) Even when I print the count of points just when the point inserts into the map1 function it reduces if I decide on the number of centroids to be higher. I dont have any information about the lost points and I still don't know why is it happening. Here's the link to the part of the code where the problem arises http://pastebin.com/PVnDJeAa I hope I am able to make you understand clearly now with this cleaned up code without extra stuff. |
Hi Ufuk,
Did you get time to go through my issue, just wanted to follow up to see whether I can get a solution or not. |
Unfortunately, no. It's expected for streaming iterations to loose
data (known shortcoming), but I don't see why they never see the initial input. Maybe Gyula or Paris (they worked on this previously) can chime in. – Ufuk On Tue, Jul 19, 2016 at 10:15 AM, Biplob Biswas <[hidden email]> wrote: > Hi Ufuk, > > Did you get time to go through my issue, just wanted to follow up to see > whether I can get a solution or not. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Ufuk,
Thanks for the update, is there any known way to fix this issue? Any workaround that you know of, which I can try? |
CC Gyula and Paris in case they might want to help out.
On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas <[hidden email]> wrote: > Hi Ufuk, > > Thanks for the update, is there any known way to fix this issue? Any > workaround that you know of, which I can try? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
This is possibly related to the way the queue between StreamIterationTail and Head is currently implemented.
I think this part is a bit prone to records loss when things get wacky and backpressure kicks in (but at least it avoids deadlocks, right?).
I don’t have the time availability to look into the code right now but I am going to focus on a progress/loops by September, hopefully with a FLIP, which solves that part as well.
If this is urgent, please go ahead and check this now, I think that queue timeouts cause this...
Paris
PS: on my yet incomplete PR (I know I know) I basically disabled queue polling timeouts since the checkpoint overhead on the StreamIterationHead almost always led to record loss.
https://github.com/apache/flink/pull/1668
|
@Paris Thanks for the prompt feedback! I really have to check out your PR :)
@Biblop: If I understand correctly, a possible workaround in the meantime seems to be to use `setBufferTimeout(0)` on your StreamExecutionEnvironment. Could you try that? On Wed, Jul 20, 2016 at 12:30 PM, Paris Carbone <[hidden email]> wrote: > This is possibly related to the way the queue between StreamIterationTail > and Head is currently implemented. > I think this part is a bit prone to records loss when things get wacky and > backpressure kicks in (but at least it avoids deadlocks, right?). > > I don’t have the time availability to look into the code right now but I am > going to focus on a progress/loops by September, hopefully with a FLIP, > which solves that part as well. > > If this is urgent, please go ahead and check this now, I think that queue > timeouts cause this... > > Paris > > PS: on my yet incomplete PR (I know I know) I basically disabled queue > polling timeouts since the checkpoint overhead on the StreamIterationHead > almost always led to record loss. > https://github.com/apache/flink/pull/1668 > > > On 20 Jul 2016, at 11:57, Maximilian Michels <[hidden email]> wrote: > > CC Gyula and Paris in case they might want to help out. > > On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas > <[hidden email]> wrote: > > Hi Ufuk, > > Thanks for the update, is there any known way to fix this issue? Any > workaround that you know of, which I can try? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > |
Hi Max,
Yeah I tried that and its definitely better. Only a few points go missing compared to a huge amount in the beginning. For now, its good for me and my work. Thanks a lot for the workaround. -Biplob |
Free forum by Nabble | Edit this page |