Hello everyone!
I want to implement a streaming algorithm like Misa-Gries or Space Saving in Flink. The goal is to maintain the heavy hitters for my (possibly unbounded) input streams throughout all the time my app runs. More precisely, I want to have a non-stop running task that runs the Space Saving algorithm and updates a data structure that should be accessible by other tasks like map, flatmap of my Flink application at ad-hoc times. Although I am not so sure of how I can achieve the aforementioned goal. First is it possible to have a structure in my main function that is updated by a task at all times and to be also accesible by others transformations at ad-hoc times?? Any ideas on how I can implement the above are more than welcome. Thanks in advance. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Max, state (keyed or operator state) is always local to the task. By default it is not accessible (read or write) from the outside or other tasks of the application. This feature was designed for external application to access the state of Flink applications. However, that should also work from inside the same job. 2017-11-30 6:01 GMT+01:00 m@xi <[hidden email]>: Hello everyone! |
Hi Fabian,
Thanks for your answer. Initially, I have excluded Queryable State as an option as it explicitly mentioned that it is used for querying state outside flink. Now that I am reading the documentation I am not sure how I may achieve that. I have to set ports and addresses which I am not sure I should since I am reading the queryable state from inside the same job. Can you or someone elaborate further how can I read the queryable state of a specific task from another task (e.g. map). Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, 2017-12-05 9:52 GMT+01:00 m@xi <[hidden email]>: Hi Fabian, |
Hi Max,
You are right that Queryable State is not designed to be used as a means for a job to query its own state. In fact, given that you do not know the jobId of your job from within the job itself, I do not think you can use queryable state in your scenario. What you can do is to have a flatMap computing the hot keys or heavy hitters, and emit as main output the elements themselves for further processing, and as a side output the computed statistics. The side output is a data stream itself so you can store it in an external storage system (e.g. a KV store) and use AsyncIO to query that system downstream. This will solve the problem of having access to the state from all tasks. This is a simple solution but I am not sure about the performance implications. You can try it to see if it actually fits your needs. Thanks, Kostas
|
Kostas and Fabian,
Thanks for the advice. I guess I will find a workaround to do the state redistribution. I also read about side outputs in this thread, which might be also an option that I will consider. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Share-state-across-operators-td17031.html Best, Makis -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
This post was updated on .
Hello everyone and Happy New Year!
Regarding the Heavy Hitter tracking...I wanna do it in a distributed manner. Thus, 1 -- Round Robin the input stream to a number of parallel map instances (say p = env.parallelism) 2 -- Each one of the p mappers maintains approximately the HH of its corresponding portion of the input, utilizing an algorithm like Space Saving, Misha-Gries etc etc. 3 -- Every now and then I would like to concatenate the state of all the p mappers into one, thus producing the global Space Saving summary for the entire input stream. 4 -- Due to the fact that I wanna balance out things given to the p mappers in the beginning, I wanna use rebalance(), i.e. round robin algorithm --> Thus, its is not possible to use Keyed State. 5 -- So, I am going to use ListCheckpointed state as described in [1]. 6 -- When the "every now and then" happens, I wanna merge the partial summaries and I will emit them through a side output, as described in [2]. The question is the following: [1] shows an example of state-redistribution. So...can I change the parallelism of the p instance parallel .map() from within the operator, and merge the summaries for the HH there just before emitting them to the side output??? Essentially, how should I implement the 6th bullet is my question. Any advice, on it or on the general guideline implementation for getting the aforementioned thing done, is more than welcome. Cheers, Max [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/ [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Anyone, someone, somebody?
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
I think it would be easier to implement a custom key selector and introduce some artifical key that spreads the load more evenly. This would also allow you to use keyed state. You could use a ProcessFunction and set timers to define the "every now and then". Keyed state would also ease the state redistribution in case the parallelism changes. Maybe could could also do the summary merge in some downstream operators. Maybe this talk [1] gives you some additional inspiration. Regards, Timo [1] https://www.youtube.com/watch?v=Do7C4UJyWCM Am 2/1/18 um 9:31 AM schrieb m@xi: > Anyone, someone, somebody? > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Timo,
Thanks a lot for the advice. I am working on it. Cheers, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |