 
	
					
		
	
					| 
		Hi,
 I am reading data points from a file and then i have to perform iterations over it. When I just check the data points before the iteration as follows, tuples.flatMap(new CheckData()) and print count inside CheckData() then I get 2500 data points each over 4 partitions, i.e. 10000 datapoints overall. Now when I do the following : ConnectedIterativeStreams<Point, Centroid[]> inputsAndMicroCluster = tuples.iterate() .withFeedbackType(Centroid[].class); DataStream<Centroid[]> updatedMicroCluster = inputsAndMicroCluster .flatMap(new MyCoFlatmap()); inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast()); Then, inside the MyCoFlatmap() method in map1, I check the count and its less than 1/5th of the total number of datapoints. I don't really know what is happening here exactly, why would the number of data points reduce like that suddenly? Thanks and Regards Biplob Biswas | 
 
	
					
		
	
					| 
		Can anyone check this once, and help me out with this? 
 I would be really obliged. | 
 
	
					
		
	
					| 
		Can you share the complete program with me? Than I would look into it.
 Could be that you count in a wrong way. The iteration should definitely consume all the initial input once at least. On Mon, Jul 4, 2016 at 12:07 PM, Biplob Biswas <[hidden email]> wrote: > Can anyone check this once, and help me out with this? > > I would be really obliged. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7795.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
		I have sent you my code in a separate email, I hope you can solve my issue.
 Thanks a lot Biplob | 
 
	
					
		
	
					| 
		Sorry Biplob, I didn't have time to look into your code today. I will
 try to do it tomorrow though. On Mon, Jul 4, 2016 at 2:53 PM, Biplob Biswas <[hidden email]> wrote: > I have sent you my code in a separate email, I hope you can solve my issue. > > Thanks a lot > Biplob > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7798.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
		Thanks a lot, would really appreciate it. 
 Also. please let me know if you don't understand it well, the documentation is not really great at the moment in the code. | 
 
	
					
		
	
					| 
		I couldn't tell anything from the code. I would suggest to reduce it
 to a minimal example with Integers where you do the same thing flow structure wise (with simple types) and let's check that again. On Wed, Jul 6, 2016 at 9:35 AM, Biplob Biswas <[hidden email]> wrote: > Thanks a lot, would really appreciate it. > > Also. please let me know if you don't understand it well, the documentation > is not really great at the moment in the code. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7838.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
		Any update?
 On Wed, Jul 6, 2016 at 5:55 PM, Ufuk Celebi <[hidden email]> wrote: > I couldn't tell anything from the code. I would suggest to reduce it > to a minimal example with Integers where you do the same thing flow > structure wise (with simple types) and let's check that again. > > On Wed, Jul 6, 2016 at 9:35 AM, Biplob Biswas <[hidden email]> wrote: >> Thanks a lot, would really appreciate it. >> >> Also. please let me know if you don't understand it well, the documentation >> is not really great at the moment in the code. >> >> >> >> -- >> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7838.html >> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
				This post was updated on .
			 
		Hi,
 Sorry for the late reply, was trying different stuff on my code. And from what I observed, its very weird for me. So after experimentation, I found out that when I increase the number of centroids, the number of data points forwarded decreases, when I lower the umber of centroids, the datapoint number increases. In my code, number of centroids is given by num_of_mc. I have reduced my code to the essential part where the problem is happening. Please find the code in the link below. To be precise, in the coflatmap map1 function I am supposed to get all the points, but I am not getting all the points which should be coming. I know that I wouldn't get the output of first n points, which as you can see from my code is just filling up the array of centroids, so I am not collecting them. But apart from that I should get every other point as I am collecting all the other point in all other cases (I have removed 2 of the cases) Even when I print the count of points just when the point inserts into the map1 function it reduces if I decide on the number of centroids to be higher. I dont have any information about the lost points and I still don't know why is it happening. Here's the link to the part of the code where the problem arises http://pastebin.com/PVnDJeAa I hope I am able to make you understand clearly now with this cleaned up code without extra stuff. | 
 
	
					
		
	
					| 
		Hi Ufuk,
 Did you get time to go through my issue, just wanted to follow up to see whether I can get a solution or not. | 
 
	
					
		
	
					| 
		Unfortunately, no. It's expected for streaming iterations to loose
 data (known shortcoming), but I don't see why they never see the initial input. Maybe Gyula or Paris (they worked on this previously) can chime in. – Ufuk On Tue, Jul 19, 2016 at 10:15 AM, Biplob Biswas <[hidden email]> wrote: > Hi Ufuk, > > Did you get time to go through my issue, just wanted to follow up to see > whether I can get a solution or not. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
		Hi Ufuk,
 Thanks for the update, is there any known way to fix this issue? Any workaround that you know of, which I can try? | 
 
	
					
		
	
					| 
		CC Gyula and Paris in case they might want to help out.
 On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas <[hidden email]> wrote: > Hi Ufuk, > > Thanks for the update, is there any known way to fix this issue? Any > workaround that you know of, which I can try? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| 
		
This is possibly related to the way the queue between StreamIterationTail and Head is currently implemented.
 I think this part is a bit prone to records loss when things get wacky and backpressure kicks in (but at least it avoids deadlocks, right?).
 I don’t have the time availability to look into the code right now but I am going to focus on a progress/loops by September, hopefully with a FLIP, which solves that part as well.  If  this is urgent, please go ahead and check this now, I think that queue timeouts cause this... Paris PS: on my yet incomplete PR (I know I know) I basically disabled queue polling timeouts since the checkpoint overhead on the StreamIterationHead almost always led to record loss. https://github.com/apache/flink/pull/1668 
 | 
 
	
					
		
	
					| 
		@Paris Thanks for the prompt feedback! I really have to check out your PR :)
 @Biblop: If I understand correctly, a possible workaround in the meantime seems to be to use `setBufferTimeout(0)` on your StreamExecutionEnvironment. Could you try that? On Wed, Jul 20, 2016 at 12:30 PM, Paris Carbone <[hidden email]> wrote: > This is possibly related to the way the queue between StreamIterationTail > and Head is currently implemented. > I think this part is a bit prone to records loss when things get wacky and > backpressure kicks in (but at least it avoids deadlocks, right?). > > I don’t have the time availability to look into the code right now but I am > going to focus on a progress/loops by September, hopefully with a FLIP, > which solves that part as well. > > If this is urgent, please go ahead and check this now, I think that queue > timeouts cause this... > > Paris > > PS: on my yet incomplete PR (I know I know) I basically disabled queue > polling timeouts since the checkpoint overhead on the StreamIterationHead > almost always led to record loss. > https://github.com/apache/flink/pull/1668 > > > On 20 Jul 2016, at 11:57, Maximilian Michels <[hidden email]> wrote: > > CC Gyula and Paris in case they might want to help out. > > On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas > <[hidden email]> wrote: > > Hi Ufuk, > > Thanks for the update, is there any known way to fix this issue? Any > workaround that you know of, which I can try? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. > > | 
 
	
					
		
	
					| 
		Hi Max,
 Yeah I tried that and its definitely better. Only a few points go missing compared to a huge amount in the beginning. For now, its good for me and my work. Thanks a lot for the workaround. -Biplob | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
