Hello Flinkers,
I would like to discuss with you about something that bothers me. So, I have two streams that I want to join along with a third stream which I want to consult its data from time to time and triggers decisions. Essentially, this boils down to coProcessing 3 streams together instead of 2, which to the best of my knowledge is not possible. I thought to append an extra field to the 2 streams I want to join, namely say S1, S2 are the streams with tuples t1, t2. After the pumping with the extra field which is the stream id (1 or 2) the tuple would be (1, t1) and (2, t2) resulting to S1' and S2'. Then I will do S1'.union(S2') which gives me a single data stream. Then this I may join with the 3rd stream and do the processing with a coProcess function. Although, whenever I process and element from the united streams I should have an if-then-else to check to which stream a tuple belongs and process and update S1' and S2' state accordingly. Do you think this is a good idea? In terms of efficiency compared with having two functions to do this, namely processElement1() and processElement2() of the coProcess function in case I only had two streams. And if the aforementioned scheme is feasible, then I guess up til now, this is the only way of joining more than 2 streams. Am I right? Thanks in advance for your help. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Max,
if I understood correctly, instead of joining three streams, you actually performed two separate joins, say S1 JOIN S3 and S2 JOIN S3, right? Your plan "(S1 UNION S2) JOIN S3” seems to be identical with “(S1 JOIN S3) UNION (S2 JOIN S3)” and if that’s what you need, your pipeline should be feasible I think. However, If you want to join three streams, you may first join S1 with S2 to produce S12 with a CoProcessFunction, and then set another CoProcessFunction to join S12 with S3. Hope that helps. Best, Xingcan
|
Hello XingCan,
Finally, I did it with union. Now inside the processElement() function of my CoProcessFunction I am setting a timer and periodically I want to print out some data through the onTimer() function. Below I attach the image stating the following: "Caused by: java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams." <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/Screen_Shot_2018-02-13_at_16.png> My CoProcessFunction is an operator with parallelism=1 (I also use forceNonParallel()) to make sure about that. Thus, I am not using a Keyed State. Is the Keyed State the only way of using Timers? Furthermore, I must confess that the API is not so clear for the Managed Operator State, so I am currently NOT implementing any CheckpointedFunction etc etc. Is my application going to return the correct results, if I assume no failures etc etc.? Thanks in advance. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Max, you can use keyed state on an operator with parallelism 1 if you assign a default key with a KeySelector:stream.keyBy(new NullByteKeySelector) with NullByteKeySelector defined as public class NullByteKeySelector<T> implements KeySelector<T, Byte> { private static final long serialVersionUID = 614256539098549020L; @Override public Byte getKey(T value) throws Exception { return 0; } } With this trick, all records are assigned to the same key and you can use keyed state and timers. Best, Fabian 2018-02-13 9:59 GMT+01:00 m@xi <[hidden email]>: Hello XingCan, |
OK Great!
Thanks a lot for the super ultra fast answer Fabian! One intuitive follow-up question. So, keyed state is the most preferable one, as it is easy for the Flink System to perform the re-distribution in case of change in parallelism, if we have a scale-up or scale-down. Also, it is useful to use hash partition a stream to different nodes/processors/PU (Processing Units) in general, by Keyed State. Any other reasons for making Keyed State a must? Last but not least, can you elaborate further on the "when the parallelism changes" part. I have read this in many topics in this forum, but I cannot understand its essence. For example, I define the parallelism of each operator in my Flink Job program based on the number of available PU. Maybe the essence lies in the fast that the number of PU might change from time to time, e.g. add more servers to the cluster where Flink runs and without stopping the Flink Job that runs you may perform the rescaling. Thanks in advance. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Max,
Currently, the timers can only be used with keyed streams. As @Fabian suggested, you can “forge” a keyed stream with the special KeySelector, which maps all the records to the same key. IMO, Flink uses keyed streams/states as it’s a deterministic distribution mechanism. Here, “the parallelism changes” may also refer to a parallelism change after the job restarts (e.g., when a node crashes). Flink can make sure that all the processing tasks and states will be safely re-distributed across the new cluster. Hope that helps. Best, Xingcan > On 13 Feb 2018, at 5:18 PM, m@xi <[hidden email]> wrote: > > OK Great! > > Thanks a lot for the super ultra fast answer Fabian! > > One intuitive follow-up question. > > So, keyed state is the most preferable one, as it is easy for the Flink > System to perform the re-distribution in case of change in parallelism, if > we have a scale-up or scale-down. Also, it is useful to use hash partition a > stream to different nodes/processors/PU (Processing Units) in general, by > Keyed State. > > Any other reasons for making Keyed State a must? > > Last but not least, can you elaborate further on the "when the parallelism > changes" part. I have read this in many topics in this forum, but I cannot > understand its essence. For example, I define the parallelism of each > operator in my Flink Job program based on the number of available PU. Maybe > the essence lies in the fast that the number of PU might change from time to > time, e.g. add more servers to the cluster where Flink runs and without > stopping the Flink Job that runs you may perform the rescaling. > > Thanks in advance. > > Best, > Max > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
You might also want to change the parallelism if the rate of your input streams varies, e.g., you scale an application down over night or the weekend. 2018-02-13 13:43 GMT+01:00 Xingcan Cui <[hidden email]>: Hi Max, |
Thanks a lot Fabian and Xingcan!
@ Fabian : Nice answer. It enhanced my intuition on the topic. So, how one may change the parallelism while the Flink job is running, e.g. lower the parallelism during the weekend? Also, it is not clear to me how to use the rescale() operator. If you may provide a more thorough example, cause the one in the documentation is not so good in my humble opinion. With some code/pseudo code, it would be great. Thanks in advance. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Changing the parallelism works in Flink by taking a savepoint, shutting down the job, and restarting it from the savepoint with another parallelism. The rescale() operator defines how records are exchanged between two operators with different parallelism.For example if you have a pipeline A -rescale-> B, where operator A has 2 tasks and operator B 4 tasks, then A(1) would send data to B(1) and B(3) and A(2) to B(2) and B(4). Since A(1) / B(1) and A(2) / B(2) run on the same machine (unless explicitly differently scheduled), the data exchange between them is local. Best, Fabian 2018-02-13 16:22 GMT+01:00 m@xi <[hidden email]>: Thanks a lot Fabian and Xingcan! |
Hey Fabian!
Thanks for the comprehensive replies. Now I understand those concepts properly. Regarding .rescale() , it does not receive any arguments. Thus, I assume that the way it does the shuffling from operator A to operator B instances is a black box for the programmer and probably has to do with the number of slots in each taskmanager. It strives to favour local data exchange (aka *intra-exchange* : between slot of the same taskmanager) instead of *inter-exchange* of data between different taskmanagers (that burdens the network). Am I correct? Thanks in advance. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I don't think that the mapping is that sophisticated. I'd assume it is a bit simpler and just keeps one local pipeline (the one with the same subtask index) which will run in the same slot (unless explicitly configured differently). 2018-02-20 11:00 GMT+01:00 m@xi <[hidden email]>: Hey Fabian! |
OK man! Thanks a lot.
To tell you the truth the documentation did not explain it in a convincing way to consider it an important/potential operator to use in my applications. Thanks for mentioning. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |