Hi, I have yet another question, this time maintaining a global list of centroids.
I am trying to implement the clustream algorithm and for that purpose I have the initial set of centres in a flink dataset. Now I need to update the set of centres for every data tuple that comes from the stream. From what I have read so far on 2 different posts having similar questions, is that, in case of streaming datasets the co-map operator was asked to use and retrieve them in 2 separate map functions. My idea is to broadcast the dataset in each flink partition and whenever a data tuple is mapped to a partition using a map function, update the broadcasted dataset. But as this is currently not possible, thus I was thinking to broadcast the datastream using "ds.broadcast()" so that every partition receives the streamed tuple. Then, use a normal flatmap function for the centres and use the broadcasted tuple to update the centres and return the updated set of centres. My question is, would this work? If yes, may someone give an example of the datastream broadcast function and how to retrieve the broadcasted stream in a map function? |
I am pretty new to flink systems, thus can anyone atleast give me an example of how datastream.broadcast() method works? From the documentation i get the following:
broadcast() Sets the partitioning of the DataStream so that the output elements are broadcasted to every parallel instance of the next operation. If the output elements are broadcasted, then how are they retrieved? Or maybe I am looking at this method in a completely wrong way? Thanks Biplob Biswas |
Hi Biplob,
one of our developers had a stream clustering example a while back. It was using a broadcast feedback edge with a co-operator to update the centroids. I'll directly include him in the email so that he will notice and can send you the example. Cheers, Aljoscha On Thu, 28 Apr 2016 at 13:57 Biplob Biswas <[hidden email]> wrote: I am pretty new to flink systems, thus can anyone atleast give me an example |
That would really be great, any example would help me proceed with my work.
Thanks a lot.
|
Hi Biplob,
I have implemented a similar algorithm as Aljoscha mentioned. First things to clarify are the following: There is currently no abstraction for keeping objects (in you case centroids) in a centralized way that can be updated/read by all operators. This would probably be very costly and is actually not necessary in your case. Broadcast a stream in contrast with other partitioning methods mean that the events will be replicated to all downstream operators. This not a magical operator that will make state available among parallel instances. Now let me explain what I think you want from Flink and how to do it :) You have input data stream and a set of centroids to be updated based on the incoming records. As you want to do this in parallel you have an operator (let's say a flatmap) that keeps the centroids locally and updates it on it's inputs. Now you have a set of independently updated centroids, so you want to merge them and update the centroids in each flatmap. Let's see how to do this. Given that you have your centroids locally, updating them is super easy, so I will not talk about that. The problematic part is periodically merging end "broadcasting" the centroids so all the flatmaps eventually see the same (they don't have to always be the same for clustering probably). There is no operator for sending state (centroids) between subtasks so you have to be clever here. We can actually use cyclic streams to solve this problem by sending the centroids as simple events to a CoFlatMap: DataStream<Point> input = ... ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids = input.iterate().withFeedbackType(Centroids.class) DataStream<Centroids> updatedCentroids = inputsAndCentroids.flatMap(MyCoFlatmap) inputsAndCentroids.closeWith(updatedCentroids.broadcast()) MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events and update its local centroids (and periodically output the centroids) and on the other input would send centroids of other flatmaps and would merge them to the local. This might be a lot to take in at first, so you might want to read up on streaming iterations and connected streams before you start. Let me know if this makes sense. Cheers, Gyula Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. ápr. 28., Cs, 14:41): That would really be great, any example would help me proceed with my work. |
Hi Gyula,
I read your workaround and started reading about flink iterations, coflatmap operators and other things. Now, I do understand a few things but the solution you provided is not completely clear to me. I understand the following things from your post. 1. You initially have a datastream of points, on which you iterate and the 'withFeedbackType' defines the type of the connected stream so rather than "Points" the type is "Centroids" now. 2.On this connected stream (which I understand, only have the streamed points right now), you run a flat map operator. And you mention "MyCoFlatmap would be a CoFlatMapFunction which on 1 input receive events and update its local centroids (and periodically output the centroids) and on the other input would send centroids of other flatmaps and would merge them to the local." I dont understand this part completely, if i am not wrong, you are saying that the co flatmap function would have 2 map functions. Now i dont understand this part .. as to what specifically am i doing in each map function? 3. lastly, the updated centroids which came back from the coflatmap function is fed back to the stream again and this is the part i get lost again ... how is this centroid fed back and if this is fed back what happens to the point stream? and if it does somehow is fed back, how do i catch it in the coflatmap function? If I understand this a bit, then in your code the first set of centroids are created in the coflatmap function and you dont already have a list of centroids to start with? Am i assuming it correct? I underwent the process of iteration in the Kmeans example from this following link: https://github.com/stratosphere/stratosphere/blob/master/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/clustering/KMeans.java and I understand how this is working .. but i am stil not clear how ur example is working. Could you please explain it a bit more? with some examples maybe? Thanks a lot.
|
Hi Gyula,
I understand more now how this thing might work and its fascinating. Although I still have one question with the coflatmap function. First, let me explain what I understand and whether its correct or not: 1. The connected iterative stream ensures that the coflatmap function receive the points and the centroids which are broadcasted on each iteration defined by closewith. 2. So in the coflatmap function, on one map I get the points and on the other map function i get the centroids which are broadcasted. Now comes the part I am assuming a bit because I dont understand from the theory. 3. Assuming I can use the broadcasted centroids, I calculate the nearest centroid from the streamed point and I update the centroid and only use one of the collectors to return the updated centroids list back. The question here is, I am assuming that this operation is not done in parallel as if streams are sent in parallel how would I ensure correct update of the centroids as multiple points can try to update the same centroid in parallel . I hope I made myself clear with this. Thanks and Regards Biplob
|
Hey, I think you got the good idea :) So your coflatmap will get all the centroids that you have sent to the stream in the closeWith call. This means that whenever you collect a new set of centroids they will be iterated back. This means you don't always want to send the centroids out on the collector, only periodically. The order in which these come is pretty much arbitrary so you need to make sure to add some logic by which you can order it if this is important. Im not sure if this helped or not :D Gyula Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 2., H, 13:13): Hi Gyula, |
Hi Gyula,
Could you explain a bit why i wouldn't want the centroids to be collected after every point? I mean, once I get a streamed point via map1 function .. i would want to compare the distance of the point with a centroid which arrives via map2 function and i keep on comparing for every centroid which comes in subsequently, once the update of the centroid happens shouldn't i collect the entire set? Thus, updating a centroid and collecting it back for the next point in the iteration. I may not be getting the concept properly here, so an example snippet would help in a long run. Thanks & Regards Biplob
|
Hi, Iterating after every incoming point/centroid update means that you basically defeat the purpose of having parallelism in your Flink job. If you only "sync" the centroids periodically by the broadcast you can make your program run efficiently in parallel. This should be fine for machine learning use-cases where the results should converge anyways. Gyula Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 2., H, 17:02): Hi Gyula, |
This is exactly what I am confused about, if i understand it correctly each of the map function in the co-flat map would receive one tuple each at a time .. so that would mean if i have a datastream of centroids, it would arrive one at a time on the partitions and that would defeat the purpose.
Are you proposing that i put the entire list of centroid as a single datastream object so that the map functions get the entire list whenever it is called? Would it be possible for you to just give an example or a code snippet or a link to some use case of the co-flat map function? Thanks a lot for your help throughout. Regards Biplob Biswas
|
Hi Gyula,
I tried doing something like the following in the 2 flatmaps, but i am not getting desired results and still confused how the concept you put forward would work: public static final class MyCoFlatmap implements CoFlatMapFunction<Point, Centroid, Centroid>{ Centroid[] centroids; @Override public void flatMap1(Point in, Collector<Centroid> out) throws Exception { if(flag) { Centroids = new Centroid[numofMC]; flag = false; } if(id < numofMC) { System.out.println(id); Centroid generatedMC = CentroidCreator.generateCentroid(id,timestamp,in); Centroids[id] = generatedMC; out.collect(generatedMC); id++; } else { Centroid closestMC = null; double minDistance = Double.MAX_VALUE; for(Centroid mc : Centroids) { double distance = distance(in.pt, mc.getCenter()); if (distance < minDistance) { closestMC = mc; minDistance = distance; } } double radius = getRadius(closestMC, Centroids); if (minDistance < radius) { closestMC.insert(in.pt, timestamp); } out.collect(closestMC); } } @Override public void flatMap2(Centroid in, Collector<Centroid> out) throws Exception { Centroids[in.id] = in; System.out.println("MC: "+in.toString()); } } as mentioned in my previous reply, i understand that each of the map function in the co-flat map would receive one tuple each at a time .. so that would mean if i have a datastream of centroids, it would arrive one at a time on the partitions and that would defeat the purpose because i need all of the centroid to compare the distance to. I tried storing the centroids in an array of centroid but i again dont understand how i can push all of the changes back. a small example or code snippet would really be helpful. Thanks a lot Regards Biplob |
Hi Gyula,
even after trying different things, I can't seem to get the hold of things. Also, i asked another question on the working of iteration and streaming here Because its not working the way i am expecting it to be and the inputstream is completely consumed before anything is sent back and iterated. Could you please send me to a proper direction and help me in understanding the things properly? Thanks and Regards Biplob Biswas |
Hi, If you haven't done so far please read the respective part of the the streaming docs: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#iterations Iterations just allow you to define cyclic flows, there is nothing magic about it. If your original input stream is finite, there is no guarantee on the order of your input and feedback stream so it can easily happen that the original input is consumed before receiving the feedback. Also the broadcast has again nothing to do with iterations itself. It is a partitioning patter which just means that the tuple sent will be received by all downstream instances. You have to work around these abstractions. Cheers, Gyula Biplob Biswas <[hidden email]> ezt írta (időpont: 2016. máj. 15., V, 17:01): Hi Gyula, |
Hi,
i read that article already but it is very simplistic and thus based on that article and other examples, i was trying to understand how my centroids can be sent to all the partitions and update accordingly. I also understood that the order of the input and the feedback stream cant be determined but i was expecting it to be broadcasted after every collect call so that all the partitions receive updated values. But now i am confused how this entire iteration and broadcast thing can even help me maintaining a central state of centroids. I have even tried something similar to this: DataStream<Long> mainInput = env.generateSequence(2, 30); DataStream<Long> initialIterateInput = env.fromElements(i); IterativeStream.ConnectedIterativeStreams<Long, Long[]> iteration = mainInput.iterate().withFeedbackType(Long[].class); DataStream<Long[]> iterateHead = iteration .flatMap(new CoFlatMapFunction<Long, Long[], Long[]>() { long globalVal = 1; Long[] arr; boolean flag = true; int i = 0; @Override public void flatMap1(Long value, Collector<Long[]> out) throws Exception { if(flag) { arr = new Long[10]; } Thread.sleep(1000); arr[i] = value; i++; System.out.println("SEEING FROM INPUT 1: " + Arrays.toString(arr) +", "+globalVal); out.collect(arr); } @Override public void flatMap2(Long[] value, Collector<Long[]> out) throws Exception { Thread.sleep(1000); for(int i=0 ;i<value.length;i++) { arr[i] = value[i]; } System.out.println("SEEING FROM INPUT 2: " + Arrays.toString(arr) +", "+globalVal); //out.collect(value); } }); iteration.closeWith(iterateHead.broadcast()); where the arr is the array of my centroids and the value in the first map would be the points coming from input stream. So,i made this example to be run for a small streaming scenario and the results which are being printed. As I started working on this based on the idea that collection is done and then on each iteration for each point the broadcast supplies the latest centroid. That's why i am constantly asking you and providing you updates of what I did and what I am doing, but unless I understand how this central state of centroid is emulated I cant proceed forward. Thus I request you if you can provide me with a small example or snippet or anything to make me understand how are you proposing to keep a central state and when to update. As without this basic understanding I am not being able to do anything. Thanks a lot. Regards Biplob |
Free forum by Nabble | Edit this page |