Hi. I am struggling the past few days to find a solution on the following problem, using Apache Flink:
I have a stream of vectors, represented by files in a local folder. After a new text file is located using DataStream<String> text = env.readFileStream(...), I transform (flatMap), the Input into a SingleOutputStreamOperator<Tuple2<String, Integer>, ?>, with the Integer being the score coming from a scoring function. I want to persist a global HashMap containing the top-k vectors, using their scores. I approached the problem using a statefull transformation. 1. The first problem I have is that the HashMap retains per-sink data (so for each thread of workers, one HashMap of data). How can I make that a Global collection 2. Using Apache Spark, I made that possible by JavaPairDStream<String, Integer> stateDstream = tuples.updateStateByKey(updateFunction); and then making transformations on the stateDstream. Is there a way I can get the same functionality using FLink? Thanks in advance! |
Hey!
What you are trying to do here is a global rolling aggregation, which is inherently a DOP 1 operation. Your observation is correct that if you want to use a simple stateful sink, you need to make sure that you set the parallelism to 1 in order to get correct results. What you can do is to keep local top-ks in a parallel operator (let's say a flatmap) and periodically output the local top-k elements and merge them in a sink with parallelism=1 to produce a global top-k. I am not 100% sure how you implemented the same functionality in spark but there you probably achieved the semantics I described above. The whole problem is much easier if you are interested in the top-k elements grouped by some key, as then you can use partitioned operator states which will give you the correct results with arbitrary parallelism. Cheers, Gyula defstat <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 21:40): Hi. I am struggling the past few days to find a solution on the following |
Hi, I wanted to post something along the same lines but now I don't think the approach with local top-ks and merging works. For example, if you want to get top-4 and you do the pre-processing in two parallel instances. This input data would lead to incorrect results: 1. Instance: a 6 b 5 c 4 d 3 2. Instance: e 10 f 9 g 8 h 7 a 6 b 5 c 4 d 3 So each parallel instance would forward its local top-4, which would lead to the end result: e 10 f 9 g 8 h 7 Which is wrong. I think no matter how many elements you forward you can construct cases that lead to wrong results. (The problem seems to be that top-k is inherently global.) Might also be that I'm tired and not seeing this right... :D For the case where your elements are partitioned by some key you should be fine, though, as Gyula mentioned. I'm not familiar with the Spark API, maybe you can help me out. What does the updateStateByKey() do if your state is not actually partitioned by a key. Plus, I'm curious in general what Spark does with this call. Cheers, Aljoscha On Sun, 23 Aug 2015 at 22:28 Gyula Fóra <[hidden email]> wrote:
|
Hey, I am not sure if I get it, why aren't the results correct? You don't instantly get the global top-k, but you are always updating it with the new local results. Gyula Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2015. aug. 23., V, 22:58):
|
In the example the result is not correct because the values for a,b,c and d are never forwarded from instance 2 even though they would modify the global top-k result. It works, though, if you partition by the key field (tuple field 0, in this case) before doing the summation and local top-k. I think. Best, Aljoscha On Sun, 23 Aug 2015 at 23:07 Gyula Fóra <[hidden email]> wrote:
|
Hi,
Okay, than I understood correctly. My point was something different. I never said that the approach I suggested will produce identical results to the continuos DOP 1 top-k, because thats impossible to parallelize. What I suggested is to apply batch (or window) updates which would periodically give you the "current" top-k (so some updates will be overwritten before being sent to the output). If this is feasible or not, depends on the application, but it should probably be fine. Cheers, Gyula On Mon, Aug 24, 2015 at 8:46 AM Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |