Regarding Broadcast of datasets in streaming context

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi, I have yet another question, this time maintaining a global list of centroids.

I am trying to implement the clustream algorithm and for that purpose I have the initial set of centres in a flink dataset. Now I need to update the set of centres for every data tuple that comes from the stream. From what I have read so far on 2 different posts having similar questions, is that, in case of streaming datasets the co-map operator was asked to use and retrieve them in 2 separate map functions.

My idea is to broadcast the dataset in each flink partition and whenever a data tuple is mapped to a partition using a map function, update the broadcasted dataset.
But as this is currently not possible, thus I was thinking to broadcast the datastream using

"ds.broadcast()"

so that every partition receives the streamed tuple. Then, use a normal flatmap function for the centres and use the broadcasted tuple to update the centres and return the updated set of centres.

My question is, would this work? If yes, may someone give an example of the datastream broadcast function and how to retrieve the broadcasted stream in a map function?
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
I am pretty new to flink systems, thus can anyone atleast give me an example of how datastream.broadcast() method works? From the documentation i get the following:

broadcast()
Sets the partitioning of the DataStream so that the output elements are broadcasted to every parallel instance of the next operation.

If the output elements are broadcasted, then how are they retrieved? Or maybe I am looking at this method in a completely wrong way?

Thanks
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Aljoscha Krettek
Hi Biplob,
one of our developers had a stream clustering example a while back. It was using a broadcast feedback edge with a co-operator to update the centroids. I'll directly include him in the email so that he will notice and can send you the example.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <[hidden email]> wrote:
I am pretty new to flink systems, thus can anyone atleast give me an example
of how datastream.broadcast() method works? From the documentation i get the
following:

broadcast()
Sets the partitioning of the DataStream so that the output elements are
broadcasted to every parallel instance of the next operation.

If the output elements are broadcasted, then how are they retrieved? Or
maybe I am looking at this method in a completely wrong way?

Thanks
Biplob Biswas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
That would really be great, any example would help me proceed with my work.
Thanks a lot.

Aljoscha Krettek wrote
Hi Biplob,
one of our developers had a stream clustering example a while back. It was
using a broadcast feedback edge with a co-operator to update the centroids.
I'll directly include him in the email so that he will notice and can send
you the example.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <[hidden email]> wrote:

> I am pretty new to flink systems, thus can anyone atleast give me an
> example
> of how datastream.broadcast() method works? From the documentation i get
> the
> following:
>
> broadcast()
> Sets the partitioning of the DataStream so that the output elements are
> broadcasted to every parallel instance of the next operation.
>
> If the output elements are broadcasted, then how are they retrieved? Or
> maybe I am looking at this method in a completely wrong way?
>
> Thanks
> Biplob Biswas
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Gyula Fóra-2
Hi Biplob,

I have implemented a similar algorithm as Aljoscha mentioned. 

First things to clarify are the following: 
There is currently no abstraction for keeping objects (in you case centroids) in a centralized way that can be updated/read by all operators. This would probably be very costly and is actually not necessary in your case. 

Broadcast a stream in contrast with other partitioning methods mean that the events will be replicated to all downstream operators. This not a magical operator that will make state available among parallel instances.

Now let me explain what I think you want from Flink and how to do it :)

You have input data stream and a set of centroids to be updated based on the incoming records. As you want to do this in parallel you have an operator (let's say a flatmap) that keeps the centroids locally and updates it on it's inputs. Now you have a set of independently updated centroids, so you want to merge them and update the centroids in each flatmap. 

Let's see how to do this. Given that you have your centroids locally, updating them is super easy, so I will not talk about that. The problematic part is periodically merging end "broadcasting" the centroids so all the flatmaps eventually see the same (they don't have to always be the same for clustering probably). There is no operator for sending state (centroids) between subtasks so you have to be clever here. We can actually use cyclic streams to solve this problem by sending the centroids as simple events to a CoFlatMap:

DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids = input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids = inputsAndCentroids.flatMap(MyCoFlatmap)
inputsAndCentroids.closeWith(updatedCentroids.broadcast())

MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events and update its local centroids (and periodically output the centroids) and on the other input would send centroids of other flatmaps and would merge them to the local.

This might be a lot to take in at first, so you might want to read up on streaming iterations and connected streams before you start.

Let me know if this makes sense.

Cheers,
Gyula


Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. ápr. 28., Cs, 14:41):
That would really be great, any example would help me proceed with my work.
Thanks a lot.


Aljoscha Krettek wrote
> Hi Biplob,
> one of our developers had a stream clustering example a while back. It was
> using a broadcast feedback edge with a co-operator to update the
> centroids.
> I'll directly include him in the email so that he will notice and can send
> you the example.
>
> Cheers,
> Aljoscha
>
> On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;

> revolutionisme@

> &gt; wrote:
>
>> I am pretty new to flink systems, thus can anyone atleast give me an
>> example
>> of how datastream.broadcast() method works? From the documentation i get
>> the
>> following:
>>
>> broadcast()
>> Sets the partitioning of the DataStream so that the output elements are
>> broadcasted to every parallel instance of the next operation.
>>
>> If the output elements are broadcasted, then how are they retrieved? Or
>> maybe I am looking at this method in a completely wrong way?
>>
>> Thanks
>> Biplob Biswas
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi Gyula,

I read your workaround and started reading about flink iterations, coflatmap operators and other things. Now, I do understand a few things but the solution you provided is not completely clear to me.

I understand the following things from your post.
1. You initially have a datastream of points, on which you iterate and the 'withFeedbackType' defines the type of the connected stream so rather than "Points" the type is  "Centroids" now.

2.On this connected stream (which I understand, only have the streamed points right now), you run a flat map operator. And you mention
"MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events and update its local centroids (and periodically output the centroids) and on the other input would send centroids of other flatmaps and would merge them to the local."
I dont understand this part completely, if i am not wrong, you are saying that the co flatmap function would have 2 map functions. Now i dont understand this part .. as to what specifically am i doing in each map function?

3. lastly, the updated centroids which came back from the coflatmap function is fed back to the stream again and this is the part i get lost again ... how is this centroid fed back and if this is fed back what happens to the point stream? and if it does somehow is fed back, how do i catch it in the coflatmap function?


If I understand this a bit, then in your code the first set of centroids are created in the coflatmap function and you dont already have a list of centroids to start with? Am i assuming it correct?

I underwent the process of iteration in the Kmeans example from this following link:
https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java

and I understand how this is working .. but i am stil not clear how ur example is working.

Could you please explain it a bit more? with some examples maybe?

Thanks a lot.

Gyula Fóra-2 wrote
Hi Biplob,

I have implemented a similar algorithm as Aljoscha mentioned.

First things to clarify are the following:
There is currently no abstraction for keeping objects (in you case
centroids) in a centralized way that can be updated/read by all operators.
This would probably be very costly and is actually not necessary in your
case.

Broadcast a stream in contrast with other partitioning methods mean that
the events will be replicated to all downstream operators. This not a
magical operator that will make state available among parallel instances.

Now let me explain what I think you want from Flink and how to do it :)

You have input data stream and a set of centroids to be updated based on
the incoming records. As you want to do this in parallel you have an
operator (let's say a flatmap) that keeps the centroids locally and updates
it on it's inputs. Now you have a set of independently updated centroids,
so you want to merge them and update the centroids in each flatmap.

Let's see how to do this. Given that you have your centroids locally,
updating them is super easy, so I will not talk about that. The problematic
part is periodically merging end "broadcasting" the centroids so all the
flatmaps eventually see the same (they don't have to always be the same for
clustering probably). There is no operator for sending state (centroids)
between subtasks so you have to be clever here. We can actually use cyclic
streams to solve this problem by sending the centroids as simple events to
a CoFlatMap:

DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids =
input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids =
inputsAndCentroids.flatMap(MyCoFlatmap)
inputsAndCentroids.closeWith(updatedCentroids.broadcast())

MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
and update its local centroids (and periodically output the centroids) and
on the other input would send centroids of other flatmaps and would merge
them to the local.

This might be a lot to take in at first, so you might want to read up on
streaming iterations and connected streams before you start.

Let me know if this makes sense.

Cheers,
Gyula


Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. ápr. 28.,
Cs, 14:41):

> That would really be great, any example would help me proceed with my work.
> Thanks a lot.
>
>
> Aljoscha Krettek wrote
> > Hi Biplob,
> > one of our developers had a stream clustering example a while back. It
> was
> > using a broadcast feedback edge with a co-operator to update the
> > centroids.
> > I'll directly include him in the email so that he will notice and can
> send
> > you the example.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <
>
> > revolutionisme@
>
> > > wrote:
> >
> >> I am pretty new to flink systems, thus can anyone atleast give me an
> >> example
> >> of how datastream.broadcast() method works? From the documentation i get
> >> the
> >> following:
> >>
> >> broadcast()
> >> Sets the partitioning of the DataStream so that the output elements are
> >> broadcasted to every parallel instance of the next operation.
> >>
> >> If the output elements are broadcasted, then how are they retrieved? Or
> >> maybe I am looking at this method in a completely wrong way?
> >>
> >> Thanks
> >> Biplob Biswas
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi Gyula,

I understand more now how this thing might work and its fascinating. Although I still have one question with the coflatmap function.

First, let me explain what I understand and whether its correct or not:
1. The connected iterative stream ensures that the coflatmap function receive the points and the centroids which are broadcasted on each iteration defined by closewith.

2. So in the coflatmap function, on one map I get the points and on the other map function i get the centroids which are broadcasted.

Now comes the part I am assuming a bit because I dont understand from the theory.
3. Assuming I can use the broadcasted centroids, I calculate the nearest centroid from the streamed point and I update the centroid and only use one of the collectors to return the updated centroids list back.


The question here is, I am assuming that this operation is not done in parallel as if streams are sent in parallel how would I ensure correct update of the centroids as multiple points can try to update the same centroid in parallel .

I hope I made myself clear with this.

Thanks and Regards
Biplob
Biplob Biswas wrote
Hi Gyula,

I read your workaround and started reading about flink iterations, coflatmap operators and other things. Now, I do understand a few things but the solution you provided is not completely clear to me.

I understand the following things from your post.
1. You initially have a datastream of points, on which you iterate and the 'withFeedbackType' defines the type of the connected stream so rather than "Points" the type is  "Centroids" now.

2.On this connected stream (which I understand, only have the streamed points right now), you run a flat map operator. And you mention
"MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events and update its local centroids (and periodically output the centroids) and on the other input would send centroids of other flatmaps and would merge them to the local."
I dont understand this part completely, if i am not wrong, you are saying that the co flatmap function would have 2 map functions. Now i dont understand this part .. as to what specifically am i doing in each map function?

3. lastly, the updated centroids which came back from the coflatmap function is fed back to the stream again and this is the part i get lost again ... how is this centroid fed back and if this is fed back what happens to the point stream? and if it does somehow is fed back, how do i catch it in the coflatmap function?


If I understand this a bit, then in your code the first set of centroids are created in the coflatmap function and you dont already have a list of centroids to start with? Am i assuming it correct?

I underwent the process of iteration in the Kmeans example from this following link:
https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java

and I understand how this is working .. but i am stil not clear how ur example is working.

Could you please explain it a bit more? with some examples maybe?

Thanks a lot.

Gyula Fóra-2 wrote
Hi Biplob,

I have implemented a similar algorithm as Aljoscha mentioned.

First things to clarify are the following:
There is currently no abstraction for keeping objects (in you case
centroids) in a centralized way that can be updated/read by all operators.
This would probably be very costly and is actually not necessary in your
case.

Broadcast a stream in contrast with other partitioning methods mean that
the events will be replicated to all downstream operators. This not a
magical operator that will make state available among parallel instances.

Now let me explain what I think you want from Flink and how to do it :)

You have input data stream and a set of centroids to be updated based on
the incoming records. As you want to do this in parallel you have an
operator (let's say a flatmap) that keeps the centroids locally and updates
it on it's inputs. Now you have a set of independently updated centroids,
so you want to merge them and update the centroids in each flatmap.

Let's see how to do this. Given that you have your centroids locally,
updating them is super easy, so I will not talk about that. The problematic
part is periodically merging end "broadcasting" the centroids so all the
flatmaps eventually see the same (they don't have to always be the same for
clustering probably). There is no operator for sending state (centroids)
between subtasks so you have to be clever here. We can actually use cyclic
streams to solve this problem by sending the centroids as simple events to
a CoFlatMap:

DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids =
input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids =
inputsAndCentroids.flatMap(MyCoFlatmap)
inputsAndCentroids.closeWith(updatedCentroids.broadcast())

MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
and update its local centroids (and periodically output the centroids) and
on the other input would send centroids of other flatmaps and would merge
them to the local.

This might be a lot to take in at first, so you might want to read up on
streaming iterations and connected streams before you start.

Let me know if this makes sense.

Cheers,
Gyula


Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. ápr. 28.,
Cs, 14:41):

> That would really be great, any example would help me proceed with my work.
> Thanks a lot.
>
>
> Aljoscha Krettek wrote
> > Hi Biplob,
> > one of our developers had a stream clustering example a while back. It
> was
> > using a broadcast feedback edge with a co-operator to update the
> > centroids.
> > I'll directly include him in the email so that he will notice and can
> send
> > you the example.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <
>
> > revolutionisme@
>
> > > wrote:
> >
> >> I am pretty new to flink systems, thus can anyone atleast give me an
> >> example
> >> of how datastream.broadcast() method works? From the documentation i get
> >> the
> >> following:
> >>
> >> broadcast()
> >> Sets the partitioning of the DataStream so that the output elements are
> >> broadcasted to every parallel instance of the next operation.
> >>
> >> If the output elements are broadcasted, then how are they retrieved? Or
> >> maybe I am looking at this method in a completely wrong way?
> >>
> >> Thanks
> >> Biplob Biswas
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Gyula Fóra
Hey,

I think you got the good idea :)

So your coflatmap will get all the centroids that you have sent to the stream in the closeWith call. This means that whenever you collect a new set of centroids they will be iterated back. This means you don't always want to send the centroids out on the collector, only periodically.

The order in which these come is pretty much arbitrary so you need to make sure to add some logic by which you can order it if this is important.

Im not sure if this helped or not :D

Gyula

Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 2., H, 13:13):
Hi Gyula,

I understand more now how this thing might work and its fascinating.
Although I still have one question with the coflatmap function.

First, let me explain what I understand and whether its correct or not:
1. The connected iterative stream ensures that the coflatmap function
receive the points and the centroids which are broadcasted on each iteration
defined by closewith.

2. So in the coflatmap function, on one map I get the points and on the
other map function i get the centroids which are broadcasted.

Now comes the part I am assuming a bit because I dont understand from the
theory.
3. Assuming I can use the broadcasted centroids, I calculate the nearest
centroid from the streamed point and I update the centroid and only use one
of the collectors to return the updated centroids list back.


The question here is, I am assuming that this operation is not done in
parallel as if streams are sent in parallel how would I ensure correct
update of the centroids as multiple points can try to update the same
centroid in parallel .

I hope I made myself clear with this.

Thanks and Regards
Biplob
Biplob Biswas wrote
> Hi Gyula,
>
> I read your workaround and started reading about flink iterations,
> coflatmap operators and other things. Now, I do understand a few things
> but the solution you provided is not completely clear to me.
>
> I understand the following things from your post.
> 1. You initially have a datastream of points, on which you iterate and the
> 'withFeedbackType' defines the type of the connected stream so rather than
> "Points" the type is  "Centroids" now.
>
> 2.On this connected stream (which I understand, only have the streamed
> points right now), you run a flat map operator. And you mention
/
> "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> and update its local centroids (and periodically output the centroids) and
> on the other input would send centroids of other flatmaps and would merge
> them to the local."
/
> I dont understand this part completely, if i am not wrong, you are saying
> that the co flatmap function would have 2 map functions. Now i dont
> understand this part .. as to what specifically am i doing in each map
> function?
>
> 3. lastly, the updated centroids which came back from the coflatmap
> function is fed back to the stream again and this is the part i get lost
> again ... how is this centroid fed back and if this is fed back what
> happens to the point stream? and if it does somehow is fed back, how do i
> catch it in the coflatmap function?
>
>
> If I understand this a bit, then in your code the first set of centroids
> are created in the coflatmap function and you dont already have a list of
> centroids to start with? Am i assuming it correct?
>
> I underwent the process of iteration in the Kmeans example from this
> following link:
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>
> and I understand how this is working .. but i am stil not clear how ur
> example is working.
>
> Could you please explain it a bit more? with some examples maybe?
>
> Thanks a lot.
> Gyula Fóra-2 wrote
>> Hi Biplob,
>>
>> I have implemented a similar algorithm as Aljoscha mentioned.
>>
>> First things to clarify are the following:
>> There is currently no abstraction for keeping objects (in you case
>> centroids) in a centralized way that can be updated/read by all
>> operators.
>> This would probably be very costly and is actually not necessary in your
>> case.
>>
>> Broadcast a stream in contrast with other partitioning methods mean that
>> the events will be replicated to all downstream operators. This not a
>> magical operator that will make state available among parallel instances.
>>
>> Now let me explain what I think you want from Flink and how to do it :)
>>
>> You have input data stream and a set of centroids to be updated based on
>> the incoming records. As you want to do this in parallel you have an
>> operator (let's say a flatmap) that keeps the centroids locally and
>> updates
>> it on it's inputs. Now you have a set of independently updated centroids,
>> so you want to merge them and update the centroids in each flatmap.
>>
>> Let's see how to do this. Given that you have your centroids locally,
>> updating them is super easy, so I will not talk about that. The
>> problematic
>> part is periodically merging end "broadcasting" the centroids so all the
>> flatmaps eventually see the same (they don't have to always be the same
>> for
>> clustering probably). There is no operator for sending state (centroids)
>> between subtasks so you have to be clever here. We can actually use
>> cyclic
>> streams to solve this problem by sending the centroids as simple events
>> to
>> a CoFlatMap:
>>
>> DataStream
>> <Point>
>>  input = ...
>> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
>> input.iterate().withFeedbackType(Centroids.class)
>> DataStream
>> <Centroids>
>>  updatedCentroids =
>> inputsAndCentroids.flatMap(MyCoFlatmap)
>> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>>
>> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
>> and update its local centroids (and periodically output the centroids)
>> and
>> on the other input would send centroids of other flatmaps and would merge
>> them to the local.
>>
>> This might be a lot to take in at first, so you might want to read up on
>> streaming iterations and connected streams before you start.
>>
>> Let me know if this makes sense.
>>
>> Cheers,
>> Gyula
>>
>>
>> Biplob Biswas &lt;

>> revolutionisme@

>> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> Cs, 14:41):
>>
>>> That would really be great, any example would help me proceed with my
>>> work.
>>> Thanks a lot.
>>>
>>>
>>> Aljoscha Krettek wrote
>>> > Hi Biplob,
>>> > one of our developers had a stream clustering example a while back. It
>>> was
>>> > using a broadcast feedback edge with a co-operator to update the
>>> > centroids.
>>> > I'll directly include him in the email so that he will notice and can
>>> send
>>> > you the example.
>>> >
>>> > Cheers,
>>> > Aljoscha
>>> >
>>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>>>
>>> > revolutionisme@
>>>
>>> > &gt; wrote:
>>> >
>>> >> I am pretty new to flink systems, thus can anyone atleast give me an
>>> >> example
>>> >> of how datastream.broadcast() method works? From the documentation i
>>> get
>>> >> the
>>> >> following:
>>> >>
>>> >> broadcast()
>>> >> Sets the partitioning of the DataStream so that the output elements
>>> are
>>> >> broadcasted to every parallel instance of the next operation.
>>> >>
>>> >> If the output elements are broadcasted, then how are they retrieved?
>>> Or
>>> >> maybe I am looking at this method in a completely wrong way?
>>> >>
>>> >> Thanks
>>> >> Biplob Biswas
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>>> >> archive
>>> >> at Nabble.com.
>>> >>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> at Nabble.com.
>>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi Gyula,

Could you explain a bit why i wouldn't want the centroids to be collected after every point?

I mean, once I get a streamed point via map1 function .. i would want to compare the distance of the point with a centroid which arrives via map2 function and i keep on comparing for every centroid which comes in subsequently, once the update of the centroid happens shouldn't i collect the entire set? Thus, updating a centroid and collecting it back for the next point in the iteration.

I may not be getting the concept properly here, so an example snippet would help in a long run.

Thanks & Regards
Biplob
Gyula Fóra wrote
Hey,

I think you got the good idea :)

So your coflatmap will get all the centroids that you have sent to the
stream in the closeWith call. This means that whenever you collect a new
set of centroids they will be iterated back. This means you don't always
want to send the centroids out on the collector, only periodically.

The order in which these come is pretty much arbitrary so you need to make
sure to add some logic by which you can order it if this is important.

Im not sure if this helped or not :D

Gyula

Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 2.,
H, 13:13):

> Hi Gyula,
>
> I understand more now how this thing might work and its fascinating.
> Although I still have one question with the coflatmap function.
>
> First, let me explain what I understand and whether its correct or not:
> 1. The connected iterative stream ensures that the coflatmap function
> receive the points and the centroids which are broadcasted on each
> iteration
> defined by closewith.
>
> 2. So in the coflatmap function, on one map I get the points and on the
> other map function i get the centroids which are broadcasted.
>
> Now comes the part I am assuming a bit because I dont understand from the
> theory.
> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
> centroid from the streamed point and I update the centroid and only use one
> of the collectors to return the updated centroids list back.
>
>
> The question here is, I am assuming that this operation is not done in
> parallel as if streams are sent in parallel how would I ensure correct
> update of the centroids as multiple points can try to update the same
> centroid in parallel .
>
> I hope I made myself clear with this.
>
> Thanks and Regards
> Biplob
> Biplob Biswas wrote
> > Hi Gyula,
> >
> > I read your workaround and started reading about flink iterations,
> > coflatmap operators and other things. Now, I do understand a few things
> > but the solution you provided is not completely clear to me.
> >
> > I understand the following things from your post.
> > 1. You initially have a datastream of points, on which you iterate and
> the
> > 'withFeedbackType' defines the type of the connected stream so rather
> than
> > "Points" the type is  "Centroids" now.
> >
> > 2.On this connected stream (which I understand, only have the streamed
> > points right now), you run a flat map operator. And you mention
> /
> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> > and update its local centroids (and periodically output the centroids)
> and
> > on the other input would send centroids of other flatmaps and would merge
> > them to the local."
> /
> > I dont understand this part completely, if i am not wrong, you are saying
> > that the co flatmap function would have 2 map functions. Now i dont
> > understand this part .. as to what specifically am i doing in each map
> > function?
> >
> > 3. lastly, the updated centroids which came back from the coflatmap
> > function is fed back to the stream again and this is the part i get lost
> > again ... how is this centroid fed back and if this is fed back what
> > happens to the point stream? and if it does somehow is fed back, how do i
> > catch it in the coflatmap function?
> >
> >
> > If I understand this a bit, then in your code the first set of centroids
> > are created in the coflatmap function and you dont already have a list of
> > centroids to start with? Am i assuming it correct?
> >
> > I underwent the process of iteration in the Kmeans example from this
> > following link:
> >
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> >
> > and I understand how this is working .. but i am stil not clear how ur
> > example is working.
> >
> > Could you please explain it a bit more? with some examples maybe?
> >
> > Thanks a lot.
> > Gyula Fóra-2 wrote
> >> Hi Biplob,
> >>
> >> I have implemented a similar algorithm as Aljoscha mentioned.
> >>
> >> First things to clarify are the following:
> >> There is currently no abstraction for keeping objects (in you case
> >> centroids) in a centralized way that can be updated/read by all
> >> operators.
> >> This would probably be very costly and is actually not necessary in your
> >> case.
> >>
> >> Broadcast a stream in contrast with other partitioning methods mean that
> >> the events will be replicated to all downstream operators. This not a
> >> magical operator that will make state available among parallel
> instances.
> >>
> >> Now let me explain what I think you want from Flink and how to do it :)
> >>
> >> You have input data stream and a set of centroids to be updated based on
> >> the incoming records. As you want to do this in parallel you have an
> >> operator (let's say a flatmap) that keeps the centroids locally and
> >> updates
> >> it on it's inputs. Now you have a set of independently updated
> centroids,
> >> so you want to merge them and update the centroids in each flatmap.
> >>
> >> Let's see how to do this. Given that you have your centroids locally,
> >> updating them is super easy, so I will not talk about that. The
> >> problematic
> >> part is periodically merging end "broadcasting" the centroids so all the
> >> flatmaps eventually see the same (they don't have to always be the same
> >> for
> >> clustering probably). There is no operator for sending state (centroids)
> >> between subtasks so you have to be clever here. We can actually use
> >> cyclic
> >> streams to solve this problem by sending the centroids as simple events
> >> to
> >> a CoFlatMap:
> >>
> >> DataStream
> >> <Point>
> >>  input = ...
> >> ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids =
> >> input.iterate().withFeedbackType(Centroids.class)
> >> DataStream
> >> <Centroids>
> >>  updatedCentroids =
> >> inputsAndCentroids.flatMap(MyCoFlatmap)
> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
> >>
> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events
> >> and update its local centroids (and periodically output the centroids)
> >> and
> >> on the other input would send centroids of other flatmaps and would
> merge
> >> them to the local.
> >>
> >> This might be a lot to take in at first, so you might want to read up on
> >> streaming iterations and connected streams before you start.
> >>
> >> Let me know if this makes sense.
> >>
> >> Cheers,
> >> Gyula
> >>
> >>
> >> Biplob Biswas <
>
> >> revolutionisme@
>
> >> > ezt írta (időpont: 2016. ápr. 28.,
> >> Cs, 14:41):
> >>
> >>> That would really be great, any example would help me proceed with my
> >>> work.
> >>> Thanks a lot.
> >>>
> >>>
> >>> Aljoscha Krettek wrote
> >>> > Hi Biplob,
> >>> > one of our developers had a stream clustering example a while back.
> It
> >>> was
> >>> > using a broadcast feedback edge with a co-operator to update the
> >>> > centroids.
> >>> > I'll directly include him in the email so that he will notice and can
> >>> send
> >>> > you the example.
> >>> >
> >>> > Cheers,
> >>> > Aljoscha
> >>> >
> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <
> >>>
> >>> > revolutionisme@
> >>>
> >>> > > wrote:
> >>> >
> >>> >> I am pretty new to flink systems, thus can anyone atleast give me an
> >>> >> example
> >>> >> of how datastream.broadcast() method works? From the documentation i
> >>> get
> >>> >> the
> >>> >> following:
> >>> >>
> >>> >> broadcast()
> >>> >> Sets the partitioning of the DataStream so that the output elements
> >>> are
> >>> >> broadcasted to every parallel instance of the next operation.
> >>> >>
> >>> >> If the output elements are broadcasted, then how are they retrieved?
> >>> Or
> >>> >> maybe I am looking at this method in a completely wrong way?
> >>> >>
> >>> >> Thanks
> >>> >> Biplob Biswas
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> View this message in context:
> >>> >>
> >>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >>> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >>> >> archive
> >>> >> at Nabble.com.
> >>> >>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> >>> Sent from the Apache Flink User Mailing List archive. mailing list
> >>> archive
> >>> at Nabble.com.
> >>>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Gyula Fóra
Hi,

Iterating after every incoming point/centroid update means that you basically defeat the purpose of having parallelism in your Flink job. 

If you only "sync" the centroids periodically by the broadcast you can make your program run efficiently in parallel. This should be fine for machine learning use-cases where the results should converge anyways.

Gyula

Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 2., H, 17:02):
Hi Gyula,

Could you explain a bit why i wouldn't want the centroids to be collected
after every point?

I mean, once I get a streamed point via map1 function .. i would want to
compare the distance of the point with a centroid which arrives via map2
function and i keep on comparing for every centroid which comes in
subsequently, once the update of the centroid happens shouldn't i collect
the entire set? Thus, updating a centroid and collecting it back for the
next point in the iteration.

I may not be getting the concept properly here, so an example snippet would
help in a long run.

Thanks & Regards
Biplob
Gyula Fóra wrote
> Hey,
>
> I think you got the good idea :)
>
> So your coflatmap will get all the centroids that you have sent to the
> stream in the closeWith call. This means that whenever you collect a new
> set of centroids they will be iterated back. This means you don't always
> want to send the centroids out on the collector, only periodically.
>
> The order in which these come is pretty much arbitrary so you need to make
> sure to add some logic by which you can order it if this is important.
>
> Im not sure if this helped or not :D
>
> Gyula
>
> Biplob Biswas &lt;

> revolutionisme@

> &gt; ezt írta (időpont: 2016. máj. 2.,
> H, 13:13):
>
>> Hi Gyula,
>>
>> I understand more now how this thing might work and its fascinating.
>> Although I still have one question with the coflatmap function.
>>
>> First, let me explain what I understand and whether its correct or not:
>> 1. The connected iterative stream ensures that the coflatmap function
>> receive the points and the centroids which are broadcasted on each
>> iteration
>> defined by closewith.
>>
>> 2. So in the coflatmap function, on one map I get the points and on the
>> other map function i get the centroids which are broadcasted.
>>
>> Now comes the part I am assuming a bit because I dont understand from the
>> theory.
>> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
>> centroid from the streamed point and I update the centroid and only use
>> one
>> of the collectors to return the updated centroids list back.
>>
>>
>> The question here is, I am assuming that this operation is not done in
>> parallel as if streams are sent in parallel how would I ensure correct
>> update of the centroids as multiple points can try to update the same
>> centroid in parallel .
>>
>> I hope I made myself clear with this.
>>
>> Thanks and Regards
>> Biplob
>> Biplob Biswas wrote
>> > Hi Gyula,
>> >
>> > I read your workaround and started reading about flink iterations,
>> > coflatmap operators and other things. Now, I do understand a few things
>> > but the solution you provided is not completely clear to me.
>> >
>> > I understand the following things from your post.
>> > 1. You initially have a datastream of points, on which you iterate and
>> the
>> > 'withFeedbackType' defines the type of the connected stream so rather
>> than
>> > "Points" the type is  "Centroids" now.
>> >
>> > 2.On this connected stream (which I understand, only have the streamed
>> > points right now), you run a flat map operator. And you mention
>> /
>> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> > and update its local centroids (and periodically output the centroids)
>> and
>> > on the other input would send centroids of other flatmaps and would
>> merge
>> > them to the local."
>> /
>> > I dont understand this part completely, if i am not wrong, you are
>> saying
>> > that the co flatmap function would have 2 map functions. Now i dont
>> > understand this part .. as to what specifically am i doing in each map
>> > function?
>> >
>> > 3. lastly, the updated centroids which came back from the coflatmap
>> > function is fed back to the stream again and this is the part i get
>> lost
>> > again ... how is this centroid fed back and if this is fed back what
>> > happens to the point stream? and if it does somehow is fed back, how do
>> i
>> > catch it in the coflatmap function?
>> >
>> >
>> > If I understand this a bit, then in your code the first set of
>> centroids
>> > are created in the coflatmap function and you dont already have a list
>> of
>> > centroids to start with? Am i assuming it correct?
>> >
>> > I underwent the process of iteration in the Kmeans example from this
>> > following link:
>> >
>> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
>> >
>> > and I understand how this is working .. but i am stil not clear how ur
>> > example is working.
>> >
>> > Could you please explain it a bit more? with some examples maybe?
>> >
>> > Thanks a lot.
>> > Gyula Fóra-2 wrote
>> >> Hi Biplob,
>> >>
>> >> I have implemented a similar algorithm as Aljoscha mentioned.
>> >>
>> >> First things to clarify are the following:
>> >> There is currently no abstraction for keeping objects (in you case
>> >> centroids) in a centralized way that can be updated/read by all
>> >> operators.
>> >> This would probably be very costly and is actually not necessary in
>> your
>> >> case.
>> >>
>> >> Broadcast a stream in contrast with other partitioning methods mean
>> that
>> >> the events will be replicated to all downstream operators. This not a
>> >> magical operator that will make state available among parallel
>> instances.
>> >>
>> >> Now let me explain what I think you want from Flink and how to do it
>> :)
>> >>
>> >> You have input data stream and a set of centroids to be updated based
>> on
>> >> the incoming records. As you want to do this in parallel you have an
>> >> operator (let's say a flatmap) that keeps the centroids locally and
>> >> updates
>> >> it on it's inputs. Now you have a set of independently updated
>> centroids,
>> >> so you want to merge them and update the centroids in each flatmap.
>> >>
>> >> Let's see how to do this. Given that you have your centroids locally,
>> >> updating them is super easy, so I will not talk about that. The
>> >> problematic
>> >> part is periodically merging end "broadcasting" the centroids so all
>> the
>> >> flatmaps eventually see the same (they don't have to always be the
>> same
>> >> for
>> >> clustering probably). There is no operator for sending state
>> (centroids)
>> >> between subtasks so you have to be clever here. We can actually use
>> >> cyclic
>> >> streams to solve this problem by sending the centroids as simple
>> events
>> >> to
>> >> a CoFlatMap:
>> >>
>> >> DataStream
>> >>
> <Point>
>> >>  input = ...
>> >> ConnectedIterativeStreams&lt;Point, Centroids&gt; inputsAndCentroids =
>> >> input.iterate().withFeedbackType(Centroids.class)
>> >> DataStream
>> >>
> <Centroids>
>> >>  updatedCentroids =
>> >> inputsAndCentroids.flatMap(MyCoFlatmap)
>> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
>> >>
>> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
>> events
>> >> and update its local centroids (and periodically output the centroids)
>> >> and
>> >> on the other input would send centroids of other flatmaps and would
>> merge
>> >> them to the local.
>> >>
>> >> This might be a lot to take in at first, so you might want to read up
>> on
>> >> streaming iterations and connected streams before you start.
>> >>
>> >> Let me know if this makes sense.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >>
>> >> Biplob Biswas &lt;
>>
>> >> revolutionisme@
>>
>> >> &gt; ezt írta (időpont: 2016. ápr. 28.,
>> >> Cs, 14:41):
>> >>
>> >>> That would really be great, any example would help me proceed with my
>> >>> work.
>> >>> Thanks a lot.
>> >>>
>> >>>
>> >>> Aljoscha Krettek wrote
>> >>> > Hi Biplob,
>> >>> > one of our developers had a stream clustering example a while back.
>> It
>> >>> was
>> >>> > using a broadcast feedback edge with a co-operator to update the
>> >>> > centroids.
>> >>> > I'll directly include him in the email so that he will notice and
>> can
>> >>> send
>> >>> > you the example.
>> >>> >
>> >>> > Cheers,
>> >>> > Aljoscha
>> >>> >
>> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas &lt;
>> >>>
>> >>> > revolutionisme@
>> >>>
>> >>> > &gt; wrote:
>> >>> >
>> >>> >> I am pretty new to flink systems, thus can anyone atleast give me
>> an
>> >>> >> example
>> >>> >> of how datastream.broadcast() method works? From the documentation
>> i
>> >>> get
>> >>> >> the
>> >>> >> following:
>> >>> >>
>> >>> >> broadcast()
>> >>> >> Sets the partitioning of the DataStream so that the output
>> elements
>> >>> are
>> >>> >> broadcasted to every parallel instance of the next operation.
>> >>> >>
>> >>> >> If the output elements are broadcasted, then how are they
>> retrieved?
>> >>> Or
>> >>> >> maybe I am looking at this method in a completely wrong way?
>> >>> >>
>> >>> >> Thanks
>> >>> >> Biplob Biswas
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> --
>> >>> >> View this message in context:
>> >>> >>
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
>> >>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>> >> archive
>> >>> >> at Nabble.com.
>> >>> >>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
>> >>> Sent from the Apache Flink User Mailing List archive. mailing list
>> >>> archive
>> >>> at Nabble.com.
>> >>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
This is exactly what I am confused about, if i understand it correctly each of the map function in the co-flat map would receive one tuple each at a time .. so that would mean if i have a datastream of centroids, it would arrive one at a time on the partitions and that would defeat the purpose.

Are you proposing that i put the entire list of centroid as a single datastream object so that the map functions get the entire list whenever it is called?

Would it be possible for you to just give an example or a code snippet or a link to some use case of the co-flat map function?

Thanks a lot for your help throughout.

Regards
Biplob Biswas

Gyula Fóra wrote
Hi,

Iterating after every incoming point/centroid update means that you
basically defeat the purpose of having parallelism in your Flink job.

If you only "sync" the centroids periodically by the broadcast you can make
your program run efficiently in parallel. This should be fine for machine
learning use-cases where the results should converge anyways.

Gyula

Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 2.,
H, 17:02):

> Hi Gyula,
>
> Could you explain a bit why i wouldn't want the centroids to be collected
> after every point?
>
> I mean, once I get a streamed point via map1 function .. i would want to
> compare the distance of the point with a centroid which arrives via map2
> function and i keep on comparing for every centroid which comes in
> subsequently, once the update of the centroid happens shouldn't i collect
> the entire set? Thus, updating a centroid and collecting it back for the
> next point in the iteration.
>
> I may not be getting the concept properly here, so an example snippet would
> help in a long run.
>
> Thanks & Regards
> Biplob
> Gyula Fóra wrote
> > Hey,
> >
> > I think you got the good idea :)
> >
> > So your coflatmap will get all the centroids that you have sent to the
> > stream in the closeWith call. This means that whenever you collect a new
> > set of centroids they will be iterated back. This means you don't always
> > want to send the centroids out on the collector, only periodically.
> >
> > The order in which these come is pretty much arbitrary so you need to
> make
> > sure to add some logic by which you can order it if this is important.
> >
> > Im not sure if this helped or not :D
> >
> > Gyula
> >
> > Biplob Biswas <
>
> > revolutionisme@
>
> > > ezt írta (időpont: 2016. máj. 2.,
> > H, 13:13):
> >
> >> Hi Gyula,
> >>
> >> I understand more now how this thing might work and its fascinating.
> >> Although I still have one question with the coflatmap function.
> >>
> >> First, let me explain what I understand and whether its correct or not:
> >> 1. The connected iterative stream ensures that the coflatmap function
> >> receive the points and the centroids which are broadcasted on each
> >> iteration
> >> defined by closewith.
> >>
> >> 2. So in the coflatmap function, on one map I get the points and on the
> >> other map function i get the centroids which are broadcasted.
> >>
> >> Now comes the part I am assuming a bit because I dont understand from
> the
> >> theory.
> >> 3. Assuming I can use the broadcasted centroids, I calculate the nearest
> >> centroid from the streamed point and I update the centroid and only use
> >> one
> >> of the collectors to return the updated centroids list back.
> >>
> >>
> >> The question here is, I am assuming that this operation is not done in
> >> parallel as if streams are sent in parallel how would I ensure correct
> >> update of the centroids as multiple points can try to update the same
> >> centroid in parallel .
> >>
> >> I hope I made myself clear with this.
> >>
> >> Thanks and Regards
> >> Biplob
> >> Biplob Biswas wrote
> >> > Hi Gyula,
> >> >
> >> > I read your workaround and started reading about flink iterations,
> >> > coflatmap operators and other things. Now, I do understand a few
> things
> >> > but the solution you provided is not completely clear to me.
> >> >
> >> > I understand the following things from your post.
> >> > 1. You initially have a datastream of points, on which you iterate and
> >> the
> >> > 'withFeedbackType' defines the type of the connected stream so rather
> >> than
> >> > "Points" the type is  "Centroids" now.
> >> >
> >> > 2.On this connected stream (which I understand, only have the streamed
> >> > points right now), you run a flat map operator. And you mention
> >> /
> >> > "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
> >> events
> >> > and update its local centroids (and periodically output the centroids)
> >> and
> >> > on the other input would send centroids of other flatmaps and would
> >> merge
> >> > them to the local."
> >> /
> >> > I dont understand this part completely, if i am not wrong, you are
> >> saying
> >> > that the co flatmap function would have 2 map functions. Now i dont
> >> > understand this part .. as to what specifically am i doing in each map
> >> > function?
> >> >
> >> > 3. lastly, the updated centroids which came back from the coflatmap
> >> > function is fed back to the stream again and this is the part i get
> >> lost
> >> > again ... how is this centroid fed back and if this is fed back what
> >> > happens to the point stream? and if it does somehow is fed back, how
> do
> >> i
> >> > catch it in the coflatmap function?
> >> >
> >> >
> >> > If I understand this a bit, then in your code the first set of
> >> centroids
> >> > are created in the coflatmap function and you dont already have a list
> >> of
> >> > centroids to start with? Am i assuming it correct?
> >> >
> >> > I underwent the process of iteration in the Kmeans example from this
> >> > following link:
> >> >
> >>
> https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java
> >> >
> >> > and I understand how this is working .. but i am stil not clear how ur
> >> > example is working.
> >> >
> >> > Could you please explain it a bit more? with some examples maybe?
> >> >
> >> > Thanks a lot.
> >> > Gyula Fóra-2 wrote
> >> >> Hi Biplob,
> >> >>
> >> >> I have implemented a similar algorithm as Aljoscha mentioned.
> >> >>
> >> >> First things to clarify are the following:
> >> >> There is currently no abstraction for keeping objects (in you case
> >> >> centroids) in a centralized way that can be updated/read by all
> >> >> operators.
> >> >> This would probably be very costly and is actually not necessary in
> >> your
> >> >> case.
> >> >>
> >> >> Broadcast a stream in contrast with other partitioning methods mean
> >> that
> >> >> the events will be replicated to all downstream operators. This not a
> >> >> magical operator that will make state available among parallel
> >> instances.
> >> >>
> >> >> Now let me explain what I think you want from Flink and how to do it
> >> :)
> >> >>
> >> >> You have input data stream and a set of centroids to be updated based
> >> on
> >> >> the incoming records. As you want to do this in parallel you have an
> >> >> operator (let's say a flatmap) that keeps the centroids locally and
> >> >> updates
> >> >> it on it's inputs. Now you have a set of independently updated
> >> centroids,
> >> >> so you want to merge them and update the centroids in each flatmap.
> >> >>
> >> >> Let's see how to do this. Given that you have your centroids locally,
> >> >> updating them is super easy, so I will not talk about that. The
> >> >> problematic
> >> >> part is periodically merging end "broadcasting" the centroids so all
> >> the
> >> >> flatmaps eventually see the same (they don't have to always be the
> >> same
> >> >> for
> >> >> clustering probably). There is no operator for sending state
> >> (centroids)
> >> >> between subtasks so you have to be clever here. We can actually use
> >> >> cyclic
> >> >> streams to solve this problem by sending the centroids as simple
> >> events
> >> >> to
> >> >> a CoFlatMap:
> >> >>
> >> >> DataStream
> >> >>
> > <Point>
> >> >>  input = ...
> >> >> ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids
> =
> >> >> input.iterate().withFeedbackType(Centroids.class)
> >> >> DataStream
> >> >>
> > <Centroids>
> >> >>  updatedCentroids =
> >> >> inputsAndCentroids.flatMap(MyCoFlatmap)
> >> >> inputsAndCentroids.closeWith(updatedCentroids.broadcast())
> >> >>
> >> >> MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive
> >> events
> >> >> and update its local centroids (and periodically output the
> centroids)
> >> >> and
> >> >> on the other input would send centroids of other flatmaps and would
> >> merge
> >> >> them to the local.
> >> >>
> >> >> This might be a lot to take in at first, so you might want to read up
> >> on
> >> >> streaming iterations and connected streams before you start.
> >> >>
> >> >> Let me know if this makes sense.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >>
> >> >> Biplob Biswas <
> >>
> >> >> revolutionisme@
> >>
> >> >> > ezt írta (időpont: 2016. ápr. 28.,
> >> >> Cs, 14:41):
> >> >>
> >> >>> That would really be great, any example would help me proceed with
> my
> >> >>> work.
> >> >>> Thanks a lot.
> >> >>>
> >> >>>
> >> >>> Aljoscha Krettek wrote
> >> >>> > Hi Biplob,
> >> >>> > one of our developers had a stream clustering example a while
> back.
> >> It
> >> >>> was
> >> >>> > using a broadcast feedback edge with a co-operator to update the
> >> >>> > centroids.
> >> >>> > I'll directly include him in the email so that he will notice and
> >> can
> >> >>> send
> >> >>> > you the example.
> >> >>> >
> >> >>> > Cheers,
> >> >>> > Aljoscha
> >> >>> >
> >> >>> > On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <
> >> >>>
> >> >>> > revolutionisme@
> >> >>>
> >> >>> > > wrote:
> >> >>> >
> >> >>> >> I am pretty new to flink systems, thus can anyone atleast give me
> >> an
> >> >>> >> example
> >> >>> >> of how datastream.broadcast() method works? From the
> documentation
> >> i
> >> >>> get
> >> >>> >> the
> >> >>> >> following:
> >> >>> >>
> >> >>> >> broadcast()
> >> >>> >> Sets the partitioning of the DataStream so that the output
> >> elements
> >> >>> are
> >> >>> >> broadcasted to every parallel instance of the next operation.
> >> >>> >>
> >> >>> >> If the output elements are broadcasted, then how are they
> >> retrieved?
> >> >>> Or
> >> >>> >> maybe I am looking at this method in a completely wrong way?
> >> >>> >>
> >> >>> >> Thanks
> >> >>> >> Biplob Biswas
> >> >>> >>
> >> >>> >>
> >> >>> >>
> >> >>> >> --
> >> >>> >> View this message in context:
> >> >>> >>
> >> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6543.html
> >> >>> >> Sent from the Apache Flink User Mailing List archive. mailing
> list
> >> >>> >> archive
> >> >>> >> at Nabble.com.
> >> >>> >>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> View this message in context:
> >> >>>
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6548.html
> >> >>> Sent from the Apache Flink User Mailing List archive. mailing list
> >> >>> archive
> >> >>> at Nabble.com.
> >> >>>
> >>
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6612.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6619.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi Gyula,

I tried doing something like the following in the 2 flatmaps, but i am not getting desired results and still confused how the concept you put forward would work:

public static final class MyCoFlatmap implements CoFlatMapFunction<Point, Centroid, Centroid>{
               
                Centroid[] centroids;

               
                @Override
                public void flatMap1(Point in, Collector<Centroid> out) throws Exception {

                        if(flag)
                        {
                                Centroids = new Centroid[numofMC];
                                flag = false;
                        }
                        if(id < numofMC)
                        {  
                                System.out.println(id);
                                Centroid generatedMC = CentroidCreator.generateCentroid(id,timestamp,in);
                                Centroids[id] = generatedMC;
                                out.collect(generatedMC);
                                id++;
                        }
                        else
                        {
                                Centroid closestMC = null;
                                double minDistance = Double.MAX_VALUE;
                                for(Centroid mc : Centroids)
                                {
                              double distance = distance(in.pt, mc.getCenter());
                              if (distance < minDistance) {
                                closestMC = mc;
                                minDistance = distance;
                              }
                            }
                                double radius = getRadius(closestMC, Centroids);
                                if (minDistance < radius)
                                {
                                        closestMC.insert(in.pt, timestamp);
                                }
                                out.collect(closestMC);
                        }
                }

                @Override
                public void flatMap2(Centroid in, Collector<Centroid> out) throws Exception {
                        Centroids[in.id] = in;
                        System.out.println("MC: "+in.toString());
                }
               
        }

as mentioned in my previous reply,  i understand that each of the map function in the co-flat map would receive one tuple each at a time .. so that would mean if i have a datastream of centroids, it would arrive one at a time on the partitions and that would defeat the purpose because i need all of the centroid to compare the distance to.

I tried storing the centroids in an array of centroid but  i again dont understand how i can push all of the changes back.

a small example or code snippet would really be helpful.

Thanks a lot

Regards
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi Gyula,

even after trying different things, I can't seem to get the hold of things. Also, i asked another question on the working of iteration and streaming here

Because its not working the way i am expecting it to be and the inputstream is completely consumed before anything is sent back and iterated.

Could you please send me to a proper direction and help me in understanding the things properly?

Thanks and Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Gyula Fóra
Hi,

If you haven't done so far please read the respective part of the the streaming docs: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#iterations

Iterations just allow you to define cyclic flows, there is nothing magic about it. If your original input stream is finite, there is no guarantee on the order of your input and feedback stream so it can easily happen that the original input is consumed before receiving the feedback.

Also the broadcast has again nothing to do with iterations itself. It is a partitioning patter which just means that the tuple sent will be received by all downstream instances. You have to work around these abstractions.

Cheers,
Gyula 

Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 15., V, 17:01):
Hi Gyula,

even after trying different things, I can't seem to get the hold of things.
Also, i asked another question on the working of iteration and streaming
here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-td6848.html>

Because its not working the way i am expecting it to be and the inputstream
is completely consumed before anything is sent back and iterated.

Could you please send me to a proper direction and help me in understanding
the things properly?

Thanks and Regards
Biplob Biswas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Broadcast-of-datasets-in-streaming-context-tp6456p6928.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Broadcast of datasets in streaming context

Biplob Biswas
Hi,

i read that article already but it is very simplistic and thus based on that article and other examples, i was trying to understand how my centroids can be sent to all the partitions and update accordingly.

I also understood that the order of the input and the feedback stream cant be determined but i was expecting it to be broadcasted after every collect call so that all the partitions receive updated values.

But now i am confused how this entire iteration and broadcast thing can even help me maintaining a central state of centroids.

I have even tried something similar to this:

 DataStream<Long> mainInput = env.generateSequence(2, 30);
                 DataStream<Long> initialIterateInput = env.fromElements(i);
       
       
                IterativeStream.ConnectedIterativeStreams<Long, Long[]> iteration =
                        mainInput.iterate().withFeedbackType(Long[].class);
                 
       
                DataStream<Long[]> iterateHead = iteration
                        .flatMap(new CoFlatMapFunction<Long, Long[], Long[]>() {
                            long globalVal = 1;
                            Long[] arr;
                            boolean flag = true;
                            int i = 0;
                        @Override
                            public void flatMap1(Long value, Collector<Long[]> out) throws Exception {
                       
                        if(flag)
                        {
                        arr = new Long[10];
                        }
                                Thread.sleep(1000);
                                arr[i] = value;
                                i++;
                                System.out.println("SEEING FROM INPUT 1: " + Arrays.toString(arr) +", "+globalVal);                
                                out.collect(arr);
                            }
       
                            @Override
                            public void flatMap2(Long[] value, Collector<Long[]> out) throws Exception {
                                Thread.sleep(1000);
                                for(int i=0 ;i<value.length;i++)
                                {
                                arr[i] = value[i];
                                }
                                System.out.println("SEEING FROM INPUT 2: " + Arrays.toString(arr) +", "+globalVal);
       
                                //out.collect(value);
       
                            }
                        });
       
                iteration.closeWith(iterateHead.broadcast());

where the arr is the array of my centroids and the value in the first map would be the points coming from input stream.
So,i made this example to be run for a small streaming scenario and the results which are being printed.

As I started working on this based on the idea that collection is done and then on each iteration for each point the broadcast supplies the latest centroid.

That's why i am constantly asking you and providing you updates of what I did and what I am doing, but unless I understand how this central state of centroid is  emulated I cant proceed forward.

Thus I request you if you can provide me with a small example or snippet or anything to make me understand how are you proposing to keep a central state and when to update. As without this basic understanding I am not being able to do anything.

Thanks a lot.

Regards
Biplob