CoProcess() VS union.Process()

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

CoProcess() VS union.Process()

m@xi
Hello Flinkers,

I would like to discuss with you about something that bothers me. So, I have
two streams that I want to join along with a third stream which I want to
consult its data from time to time and triggers decisions.

Essentially, this boils down to coProcessing 3 streams together instead of
2, which to the best of my knowledge is not possible.

I thought to append an extra field to the 2 streams I want to join, namely
say S1, S2 are the streams with tuples t1, t2. After the pumping with the
extra field which is the stream id (1 or 2) the tuple would be (1, t1) and
(2, t2) resulting to S1' and S2'.

Then I will do S1'.union(S2') which gives me a single data stream. Then this
I may join with the 3rd stream and do the processing with a coProcess
function.

Although, whenever I process and element from the united streams I should
have an if-then-else to check to which stream a tuple belongs and process
and update S1' and S2' state accordingly.

Do you think this is a good idea? In terms of efficiency compared with
having two functions to do this, namely processElement1() and
processElement2() of the coProcess function in case I only had two streams.

And if the aforementioned scheme is feasible, then I guess up til now, this
is the only way of joining more than 2 streams. Am I right?

Thanks in advance for your help.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process()

Xingcan Cui
Hi Max,

if I understood correctly, instead of joining three streams, you actually performed two separate joins, say S1 JOIN S3 and S2 JOIN S3, right? Your plan "(S1 UNION S2) JOIN S3” seems to be identical with “(S1 JOIN S3)  UNION (S2 JOIN S3)” and if that’s what you need, your pipeline should be feasible I think.

However, If you want to join three streams, you may first join S1 with S2 to produce S12 with a CoProcessFunction, and then set another CoProcessFunction to join S12 with S3.

Hope that helps.

Best,
Xingcan

On 10 Feb 2018, at 1:06 PM, m@xi <[hidden email]> wrote:

Hello Flinkers,

I would like to discuss with you about something that bothers me. So, I have
two streams that I want to join along with a third stream which I want to
consult its data from time to time and triggers decisions.

Essentially, this boils down to coProcessing 3 streams together instead of
2, which to the best of my knowledge is not possible.

I thought to append an extra field to the 2 streams I want to join, namely
say S1, S2 are the streams with tuples t1, t2. After the pumping with the
extra field which is the stream id (1 or 2) the tuple would be (1, t1) and
(2, t2) resulting to S1' and S2'.

Then I will do S1'.union(S2') which gives me a single data stream. Then this
I may join with the 3rd stream and do the processing with a coProcess
function.

Although, whenever I process and element from the united streams I should
have an if-then-else to check to which stream a tuple belongs and process
and update S1' and S2' state accordingly.

Do you think this is a good idea? In terms of efficiency compared with
having two functions to do this, namely processElement1() and
processElement2() of the coProcess function in case I only had two streams.

And if the aforementioned scheme is feasible, then I guess up til now, this
is the only way of joining more than 2 streams. Am I right?

Thanks in advance for your help.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

m@xi
Hello XingCan,

Finally, I did it with union.

Now inside the processElement() function of my CoProcessFunction I am
setting a timer and periodically I want to print out some data through the
onTimer() function.

Below I attach the image stating the following: "Caused by:
java.lang.UnsupportedOperationException: Setting timers is only supported on
a keyed streams."

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/Screen_Shot_2018-02-13_at_16.png>

My CoProcessFunction is an operator with parallelism=1 (I also use
forceNonParallel()) to make sure about that. Thus, I am not using a Keyed
State.

Is the Keyed State the only way of using Timers?

Furthermore, I must confess that the API is not so clear for the Managed
Operator State, so I am currently NOT implementing any CheckpointedFunction
etc etc.

Is my application going to return the correct results, if I assume no
failures etc etc.?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

Fabian Hueske-2
Hi Max,

you can use keyed state on an operator with parallelism 1 if you assign a default key with a KeySelector:

stream.keyBy(new NullByteKeySelector)

with NullByteKeySelector defined as

public class NullByteKeySelector<T> implements KeySelector<T, Byte> {

   private static final long serialVersionUID = 614256539098549020L;

   @Override
   public Byte getKey(T value) throws Exception {
      return 0;
   }
}

With this trick, all records are assigned to the same key and you can use keyed state and timers.

Best, Fabian

2018-02-13 9:59 GMT+01:00 m@xi <[hidden email]>:
Hello XingCan,

Finally, I did it with union.

Now inside the processElement() function of my CoProcessFunction I am
setting a timer and periodically I want to print out some data through the
onTimer() function.

Below I attach the image stating the following: "Caused by:
java.lang.UnsupportedOperationException: Setting timers is only supported on
a keyed streams."

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1161/Screen_Shot_2018-02-13_at_16.png>

My CoProcessFunction is an operator with parallelism=1 (I also use
forceNonParallel()) to make sure about that. Thus, I am not using a Keyed
State.

Is the Keyed State the only way of using Timers?

Furthermore, I must confess that the API is not so clear for the Managed
Operator State, so I am currently NOT implementing any CheckpointedFunction
etc etc.

Is my application going to return the correct results, if I assume no
failures etc etc.?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

m@xi
OK Great!

Thanks a lot for the super ultra fast answer Fabian!

One intuitive follow-up question.

So, keyed state is the most preferable one, as it is easy for the Flink
System to perform the re-distribution in case of change in parallelism, if
we have a scale-up or scale-down. Also, it is useful to use hash partition a
stream to different nodes/processors/PU (Processing Units) in general, by
Keyed State.

Any other reasons for making Keyed State a must?

Last but not least, can you elaborate further on the "when the parallelism
changes" part. I have read this in many topics in this forum, but I cannot
understand its essence. For example, I define the parallelism of each
operator in my Flink Job program based on the number of available PU. Maybe
the essence lies in the fast that the number of PU might change from time to
time, e.g. add more servers to the cluster where Flink runs and without
stopping the Flink Job that runs you may perform the rescaling.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

Xingcan Cui
Hi Max,

Currently, the timers can only be used with keyed streams. As @Fabian suggested, you can “forge” a keyed stream with the special KeySelector, which maps all the records to the same key.

IMO, Flink uses keyed streams/states as it’s a deterministic distribution mechanism. Here, “the parallelism changes” may also refer to a parallelism change after the job restarts (e.g., when a node crashes). Flink can make sure that all the processing tasks and states will be safely re-distributed across the new cluster.

Hope that helps.

Best,
Xingcan

> On 13 Feb 2018, at 5:18 PM, m@xi <[hidden email]> wrote:
>
> OK Great!
>
> Thanks a lot for the super ultra fast answer Fabian!
>
> One intuitive follow-up question.
>
> So, keyed state is the most preferable one, as it is easy for the Flink
> System to perform the re-distribution in case of change in parallelism, if
> we have a scale-up or scale-down. Also, it is useful to use hash partition a
> stream to different nodes/processors/PU (Processing Units) in general, by
> Keyed State.
>
> Any other reasons for making Keyed State a must?
>
> Last but not least, can you elaborate further on the "when the parallelism
> changes" part. I have read this in many topics in this forum, but I cannot
> understand its essence. For example, I define the parallelism of each
> operator in my Flink Job program based on the number of available PU. Maybe
> the essence lies in the fast that the number of PU might change from time to
> time, e.g. add more servers to the cluster where Flink runs and without
> stopping the Flink Job that runs you may perform the rescaling.
>
> Thanks in advance.
>
> Best,
> Max
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

Fabian Hueske-2
You might also want to change the parallelism if the rate of your input streams varies, e.g., you scale an application down over night or the weekend.

2018-02-13 13:43 GMT+01:00 Xingcan Cui <[hidden email]>:
Hi Max,

Currently, the timers can only be used with keyed streams. As @Fabian suggested, you can “forge” a keyed stream with the special KeySelector, which maps all the records to the same key.

IMO, Flink uses keyed streams/states as it’s a deterministic distribution mechanism. Here, “the parallelism changes” may also refer to a parallelism change after the job restarts (e.g., when a node crashes). Flink can make sure that all the processing tasks and states will be safely re-distributed across the new cluster.

Hope that helps.

Best,
Xingcan

> On 13 Feb 2018, at 5:18 PM, m@xi <[hidden email]> wrote:
>
> OK Great!
>
> Thanks a lot for the super ultra fast answer Fabian!
>
> One intuitive follow-up question.
>
> So, keyed state is the most preferable one, as it is easy for the Flink
> System to perform the re-distribution in case of change in parallelism, if
> we have a scale-up or scale-down. Also, it is useful to use hash partition a
> stream to different nodes/processors/PU (Processing Units) in general, by
> Keyed State.
>
> Any other reasons for making Keyed State a must?
>
> Last but not least, can you elaborate further on the "when the parallelism
> changes" part. I have read this in many topics in this forum, but I cannot
> understand its essence. For example, I define the parallelism of each
> operator in my Flink Job program based on the number of available PU. Maybe
> the essence lies in the fast that the number of PU might change from time to
> time, e.g. add more servers to the cluster where Flink runs and without
> stopping the Flink Job that runs you may perform the rescaling.
>
> Thanks in advance.
>
> Best,
> Max
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

m@xi
Thanks a lot Fabian and Xingcan!

@ Fabian : Nice answer. It enhanced my intuition on the topic. So, how one
may change the parallelism while the Flink job is running, e.g. lower the
parallelism during the weekend?


Also, it is not clear to me how to use the rescale() operator. If you may
provide a more thorough example, cause the one in the documentation is not
so good in my humble opinion. With some code/pseudo code, it would be great.

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

Fabian Hueske-2
Changing the parallelism works in Flink by taking a savepoint, shutting down the job, and restarting it from the savepoint with another parallelism.

The rescale() operator defines how records are exchanged between two operators with different parallelism.
Rescale prefers local data exchange over uniform distribution (which would be rebalance()).

For example if you have a pipeline A -rescale-> B, where operator A has 2 tasks and operator B 4 tasks, then A(1) would send data to B(1) and B(3) and A(2) to B(2) and B(4).
Since A(1) / B(1) and A(2) / B(2) run on the same machine (unless explicitly differently scheduled), the data exchange between them is local.

Best, Fabian

2018-02-13 16:22 GMT+01:00 m@xi <[hidden email]>:
Thanks a lot Fabian and Xingcan!

@ Fabian : Nice answer. It enhanced my intuition on the topic. So, how one
may change the parallelism while the Flink job is running, e.g. lower the
parallelism during the weekend?


Also, it is not clear to me how to use the rescale() operator. If you may
provide a more thorough example, cause the one in the documentation is not
so good in my humble opinion. With some code/pseudo code, it would be great.

Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

m@xi
Hey Fabian!

Thanks for the comprehensive replies. Now I understand those concepts
properly.

Regarding .rescale() , it does not receive any arguments. Thus, I assume
that the way it does the shuffling from operator A to operator B instances
is a black box for the programmer and probably has to do with the number of
slots in each taskmanager. It strives to favour local data exchange (aka
*intra-exchange* : between slot of the same taskmanager) instead of
*inter-exchange* of data between different taskmanagers (that burdens the
network).

Am I correct?

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

Fabian Hueske-2
I don't think that the mapping is that sophisticated.
I'd assume it is a bit simpler and just keeps one local pipeline (the one with the same subtask index) which will run in the same slot (unless explicitly configured differently).

TBH, I would not rely on this behavior. rescale() is rather an artifact of the first version of the DataStream API.

Best, Fabian

2018-02-20 11:00 GMT+01:00 m@xi <[hidden email]>:
Hey Fabian!

Thanks for the comprehensive replies. Now I understand those concepts
properly.

Regarding .rescale() , it does not receive any arguments. Thus, I assume
that the way it does the shuffling from operator A to operator B instances
is a black box for the programmer and probably has to do with the number of
slots in each taskmanager. It strives to favour local data exchange (aka
*intra-exchange* : between slot of the same taskmanager) instead of
*inter-exchange* of data between different taskmanagers (that burdens the
network).

Am I correct?

Reply | Threaded
Open this post in threaded view
|

Re: CoProcess() VS union.Process() & Timers in them

m@xi
OK man! Thanks a lot.

To tell you the truth the documentation did not explain it in a convincing
way to consider it an important/potential operator to use in my
applications.

Thanks for mentioning.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/