Newbie question

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Newbie question

Renato Marroquín Mogrovejo
Hi all,

I have two streams in which I need to keep counts of different metrics that will have to be shared by both of the streams. So they will be sharing some state once they have finished processing the stream. My question is if I should do this as a sink aggregating what I need at the end or by doing them as stateful operators.
I tried putting two hashmaps at the sinks of my streams, but one stream fills the hashmap and the other one can not update it. I guess they are not shared or I am doing something wrong.
Any pointers or suggestions would be great! Thanks!


Renato M.
Reply | Threaded
Open this post in threaded view
|

Re: Newbie question

Gyula Fóra
Hi Renato,

First of all to do anything together on the two streams you probably want to union them. This means that you need to have a common type. If this is the case you are lucky and you don't need anything else. Otherwise I suggest using the Either type provided by Flink as a simple wrapper.

If you just instantiate hashmaps in your sinks, you will end up with as many hashmaps as the parallelism of your sink. This means that some part of the stream will see one hashmap, some part the other. This is probably not good for you because you want to aggregate statistics over the "whole" stream and also hashmaps are not fault tolerant. 

Because of this you want to use some sort of operator state, either partitioned (KvState) or non-partitioned (Checkpointed interface) depending on the application logic.

If the statistics that you are aggregating are tied to some keys (substreams) of the incoming stream, you should probably use a ValueState in your sink which can be instantiated from the RuntimeContext of the RichFunctions, in this case a RichSinkFunction. For more details see here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#using-the-keyvalue-state-interface

If you want to aggregate over the whole incoming stream not by key, then you have to use a non-parallel sink as this is an inherently non-parallel operation. In this case you should set the sink parallelism explicitly to 1: stream.addSink(...).setParallelism(1); Now your sink can implement the Checkpointed interface which you can use to persist your HashMaps. More info can be found here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#checkpointing-local-variables

I hope this helped.

Cheers,
Gyula

Renato Marroquín Mogrovejo <[hidden email]> ezt írta (időpont: 2016. febr. 14., V, 22:22):
Hi all,

I have two streams in which I need to keep counts of different metrics that will have to be shared by both of the streams. So they will be sharing some state once they have finished processing the stream. My question is if I should do this as a sink aggregating what I need at the end or by doing them as stateful operators.
I tried putting two hashmaps at the sinks of my streams, but one stream fills the hashmap and the other one can not update it. I guess they are not shared or I am doing something wrong.
Any pointers or suggestions would be great! Thanks!


Renato M.