Maintain heavy hitters in Flink application

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Maintain heavy hitters in Flink application

m@xi
Hello everyone!

I want to implement a streaming algorithm like Misa-Gries or Space Saving in
Flink. The goal is to maintain the heavy hitters for my (possibly unbounded)
input streams throughout all the time my app runs. More precisely, I want to
have a non-stop running task that runs the Space Saving algorithm and
updates a data structure that should be accessible by other tasks like map,
flatmap of my Flink application at ad-hoc times. Although I am not so sure
of how I can achieve the aforementioned goal.

First is it possible to have a structure in my main function that is updated
by a task at all times and to be also accesible by others transformations at
ad-hoc times??

Any ideas on how I can implement the above are more than welcome.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

Fabian Hueske-2
Hi Max,

state (keyed or operator state) is always local to the task.
By default it is not accessible (read or write) from the outside or other tasks of the application.

You can expose keyed state as queryable state [1] to perform key look ups.
This feature was designed for external application to access the state of Flink applications.
However, that should also work from inside the same job.

Best, Fabian

2017-11-30 6:01 GMT+01:00 m@xi <[hidden email]>:
Hello everyone!

I want to implement a streaming algorithm like Misa-Gries or Space Saving in
Flink. The goal is to maintain the heavy hitters for my (possibly unbounded)
input streams throughout all the time my app runs. More precisely, I want to
have a non-stop running task that runs the Space Saving algorithm and
updates a data structure that should be accessible by other tasks like map,
flatmap of my Flink application at ad-hoc times. Although I am not so sure
of how I can achieve the aforementioned goal.

First is it possible to have a structure in my main function that is updated
by a task at all times and to be also accesible by others transformations at
ad-hoc times??

Any ideas on how I can implement the above are more than welcome.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

m@xi
Hi Fabian,

Thanks for your answer. Initially, I have excluded Queryable State as an
option as it explicitly mentioned that it is used for querying state outside
flink.

Now that I am reading the documentation I am not sure how I may achieve
that. I have to set ports and addresses which I am not sure I should since I
am reading the queryable state from inside the same job.

Can you or someone elaborate further how can I read the queryable state of a
specific task from another task (e.g. map).

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

Fabian Hueske-2
Hi,

I haven't done that before either. The query API will change with the next version (Flink 1.4.0) which is currently being prepared for releasing.
Kostas (in CC) might be able to help you.

Best, Fabian

2017-12-05 9:52 GMT+01:00 m@xi <[hidden email]>:
Hi Fabian,

Thanks for your answer. Initially, I have excluded Queryable State as an
option as it explicitly mentioned that it is used for querying state outside
flink.

Now that I am reading the documentation I am not sure how I may achieve
that. I have to set ports and addresses which I am not sure I should since I
am reading the queryable state from inside the same job.

Can you or someone elaborate further how can I read the queryable state of a
specific task from another task (e.g. map).

Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

Kostas Kloudas
Hi Max,

You are right that Queryable State is not designed to be used as a means for a job to query its own state.
In fact, given that you do not know the jobId of your job from within the job itself, I do not think you can use 
queryable state in your scenario.

What you can do is to have a flatMap computing the hot keys or heavy hitters, and emit as main output the 
elements themselves for further processing, and as a side output the computed statistics. The side output 
is a data stream itself so you can store it in an external storage system (e.g. a KV store) and use AsyncIO to 
query that system downstream. This will solve the problem of having access to the state from all tasks. 

This is a simple solution but I am not sure about the performance implications. 
You can try it to see if it actually fits your needs.

Thanks, 
Kostas


On Dec 5, 2017, at 10:32 AM, Fabian Hueske <[hidden email]> wrote:

Hi,

I haven't done that before either. The query API will change with the next version (Flink 1.4.0) which is currently being prepared for releasing.
Kostas (in CC) might be able to help you.

Best, Fabian

2017-12-05 9:52 GMT+01:00 m@xi <[hidden email]>:
Hi Fabian,

Thanks for your answer. Initially, I have excluded Queryable State as an
option as it explicitly mentioned that it is used for querying state outside
flink.

Now that I am reading the documentation I am not sure how I may achieve
that. I have to set ports and addresses which I am not sure I should since I
am reading the queryable state from inside the same job.

Can you or someone elaborate further how can I read the queryable state of a
specific task from another task (e.g. map).


Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

m@xi
Kostas and Fabian,

Thanks for the advice. I guess I will find a workaround to do the state
redistribution.

I also read about side outputs in this thread, which might be also an option
that I will consider.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Share-state-across-operators-td17031.html

Best,
Makis



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

m@xi
This post was updated on .
Hello everyone and Happy New Year!

Regarding the Heavy Hitter tracking...I wanna do it in a distributed manner.

Thus,
1 -- Round Robin the input stream to a number of parallel map instances (say
p = env.parallelism)
2 -- Each one of the p mappers maintains approximately the HH of its
corresponding portion of the input, utilizing an algorithm like Space
Saving, Misha-Gries etc etc.
3 -- Every now and then I would like to concatenate the state of all the p
mappers into one, thus producing the global Space Saving summary for the
entire input stream.
4 -- Due to the fact that I wanna balance out things given to the p mappers
in the beginning, I wanna use rebalance(), i.e. round robin algorithm -->
Thus, its is not possible to use Keyed State.
5 -- So, I am going to use ListCheckpointed state as described in [1].
6 -- When the "every now and then" happens, I wanna merge the partial
summaries and I will emit them through a side output, as described in [2].

The question is the following: [1] shows an example of state-redistribution.
So...can I change the parallelism of the p instance parallel .map() from
within the operator, and merge the summaries for the HH there just before
emitting them to the side output???

Essentially, how should I implement the 6th bullet is my question.

Any advice, on it or on the general guideline implementation for getting the
aforementioned thing done, is more than welcome.

Cheers,
Max

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

m@xi
Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

Timo Walther
Hi,

I think it would be easier to implement a custom key selector and
introduce some artifical key that spreads the load more evenly. This
would also allow you to use keyed state. You could use a ProcessFunction
and set timers to define the "every now and then". Keyed state would
also ease the state redistribution in case the parallelism changes.
Maybe could could also do the summary merge in some downstream
operators. Maybe this talk [1] gives you some additional inspiration.

Regards,
Timo

[1] https://www.youtube.com/watch?v=Do7C4UJyWCM



Am 2/1/18 um 9:31 AM schrieb m@xi:
> Anyone, someone, somebody?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Maintain heavy hitters in Flink application

m@xi
Hi Timo,

Thanks a lot for the advice. I am working on it.

Cheers,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/