Data point goes missing within iteration

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Data point goes missing within iteration

Biplob Biswas
Hi,

I am reading data points from a file and then i have to perform iterations over it.

When I just check the data points before the iteration as follows,

tuples.flatMap(new CheckData())

and print count inside CheckData() then I get 2500 data points each over 4 partitions, i.e. 10000 datapoints overall.


Now when I do the following :

ConnectedIterativeStreams<Point, Centroid[]> inputsAndMicroCluster = tuples.iterate()
                                                                                        .withFeedbackType(Centroid[].class);

DataStream<Centroid[]> updatedMicroCluster =
                                inputsAndMicroCluster
                                .flatMap(new MyCoFlatmap());

inputsAndMicroCluster.closeWith(updatedMicroCluster.broadcast());


Then, inside the MyCoFlatmap() method in map1, I check the count and its less than 1/5th of the total number of datapoints.


I don't really know what is happening here exactly, why would the number of data points reduce like that suddenly?

Thanks and Regards
Biplob Biswas

Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
Can anyone check this once, and help me out with this?

I would be really obliged.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Ufuk Celebi
Can you share the complete program with me? Than I would look into it.
Could be that you count in a wrong way. The iteration should
definitely consume all the initial input once at least.

On Mon, Jul 4, 2016 at 12:07 PM, Biplob Biswas <[hidden email]> wrote:
> Can anyone check this once, and help me out with this?
>
> I would be really obliged.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7795.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
I have sent you my code in a separate email, I hope you can solve my issue.

Thanks a lot
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Ufuk Celebi
Sorry Biplob, I didn't have time to look into your code today. I will
try to do it tomorrow though.

On Mon, Jul 4, 2016 at 2:53 PM, Biplob Biswas <[hidden email]> wrote:

> I have sent you my code in a separate email, I hope you can solve my issue.
>
> Thanks a lot
> Biplob
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7798.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
Thanks a lot, would really appreciate it.

Also. please let me know if you don't understand it well, the documentation is not really great at the moment in the code.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Ufuk Celebi
I couldn't tell anything from the code. I would suggest to reduce it
to a minimal example with Integers where you do the same thing flow
structure wise (with simple types) and let's check that again.

On Wed, Jul 6, 2016 at 9:35 AM, Biplob Biswas <[hidden email]> wrote:

> Thanks a lot, would really appreciate it.
>
> Also. please let me know if you don't understand it well, the documentation
> is not really great at the moment in the code.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7838.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Ufuk Celebi
Any update?

On Wed, Jul 6, 2016 at 5:55 PM, Ufuk Celebi <[hidden email]> wrote:

> I couldn't tell anything from the code. I would suggest to reduce it
> to a minimal example with Integers where you do the same thing flow
> structure wise (with simple types) and let's check that again.
>
> On Wed, Jul 6, 2016 at 9:35 AM, Biplob Biswas <[hidden email]> wrote:
>> Thanks a lot, would really appreciate it.
>>
>> Also. please let me know if you don't understand it well, the documentation
>> is not really great at the moment in the code.
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p7838.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
This post was updated on .
Hi,

Sorry for the late reply, was trying different stuff on my code. And from what I observed, its very weird for me.

So after experimentation, I found out that when I increase the number of centroids, the number of data points forwarded decreases, when I lower the umber of centroids, the datapoint number increases.
In my code, number of centroids is given by num_of_mc.

I have reduced my code to the essential part where the problem is happening. Please find the code in the link below.


To be precise, in the coflatmap map1 function I am supposed to get all the points, but I am not getting all the points which should be coming.

I know that I wouldn't get the output of first n points, which as you can see from my code is just filling up the array of centroids, so I am not collecting them.
But apart from that I should get every other point as I am collecting all the other point in all other cases (I have removed 2 of the cases)

Even when I print the count of points just when the point inserts into the map1 function it reduces if I decide on the number of centroids to be higher.


I dont have any information about the lost points and I still don't know why is it happening.


Here's the link to the part of the code where the problem arises
http://pastebin.com/PVnDJeAa

I hope I am able to make you understand clearly now with this cleaned up code without extra stuff.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
Hi Ufuk,

Did you get time to go through my issue, just wanted to follow up to see whether I can get a solution or not.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Ufuk Celebi
Unfortunately, no. It's expected for streaming iterations to loose
data (known shortcoming), but I don't see why they never see the
initial input. Maybe Gyula or Paris (they worked on this previously)
can chime in.

– Ufuk

On Tue, Jul 19, 2016 at 10:15 AM, Biplob Biswas
<[hidden email]> wrote:

> Hi Ufuk,
>
> Did you get time to go through my issue, just wanted to follow up to see
> whether I can get a solution or not.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
Hi Ufuk,

Thanks for the update, is there any known way to fix this issue? Any workaround that you know of, which I can try?
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Maximilian Michels
CC Gyula and Paris in case they might want to help out.

On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas
<[hidden email]> wrote:

> Hi Ufuk,
>
> Thanks for the update, is there any known way to fix this issue? Any
> workaround that you know of, which I can try?
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Paris Carbone
This is possibly related to the way the queue between StreamIterationTail and Head is currently implemented.
I think this part is a bit prone to records loss when things get wacky and backpressure kicks in (but at least it avoids deadlocks, right?).

I don’t have the time availability to look into the code right now but I am going to focus on a progress/loops by September, hopefully with a FLIP, which solves that part as well. 

If  this is urgent, please go ahead and check this now, I think that queue timeouts cause this...

Paris

PS: on my yet incomplete PR (I know I know) I basically disabled queue polling timeouts since the checkpoint overhead on the StreamIterationHead almost always led to record loss.
https://github.com/apache/flink/pull/1668

On 20 Jul 2016, at 11:57, Maximilian Michels <[hidden email]> wrote:

CC Gyula and Paris in case they might want to help out.

On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas
<[hidden email]> wrote:
Hi Ufuk,

Thanks for the update, is there any known way to fix this issue? Any
workaround that you know of, which I can try?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Maximilian Michels
@Paris Thanks for the prompt feedback! I really have to check out your PR :)

@Biblop: If I understand correctly, a possible workaround in the
meantime seems to be to use `setBufferTimeout(0)` on your
StreamExecutionEnvironment. Could you try that?



On Wed, Jul 20, 2016 at 12:30 PM, Paris Carbone <[hidden email]> wrote:

> This is possibly related to the way the queue between StreamIterationTail
> and Head is currently implemented.
> I think this part is a bit prone to records loss when things get wacky and
> backpressure kicks in (but at least it avoids deadlocks, right?).
>
> I don’t have the time availability to look into the code right now but I am
> going to focus on a progress/loops by September, hopefully with a FLIP,
> which solves that part as well.
>
> If  this is urgent, please go ahead and check this now, I think that queue
> timeouts cause this...
>
> Paris
>
> PS: on my yet incomplete PR (I know I know) I basically disabled queue
> polling timeouts since the checkpoint overhead on the StreamIterationHead
> almost always led to record loss.
> https://github.com/apache/flink/pull/1668
>
>
> On 20 Jul 2016, at 11:57, Maximilian Michels <[hidden email]> wrote:
>
> CC Gyula and Paris in case they might want to help out.
>
> On Tue, Jul 19, 2016 at 11:43 AM, Biplob Biswas
> <[hidden email]> wrote:
>
> Hi Ufuk,
>
> Thanks for the update, is there any known way to fix this issue? Any
> workaround that you know of, which I can try?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Data point goes missing within iteration

Biplob Biswas
Hi Max,

Yeah I tried that and its definitely better. Only a few points go missing compared to a huge amount in the beginning. For now, its good for me and my work.

Thanks a lot for the workaround.

-Biplob