Start streaming tuples depending on another streams rate

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Start streaming tuples depending on another streams rate

Jonas Gröger
This post was updated on .
Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B.
A
    .connect(B)
    .keyBy(_.id, _.id)
    .flatMap(new MyOp)

In MyOp, the A stream tuples are combined to form a state using a ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s. A big drop.

What I now want is that while tuples from A are being processed in flatMap1, the stream B in flatMap2 should wait until the rate of the A stream has dropped and only then, be flatMap2 should be called. Ideally, this behaviour would be captured in a separate operator, like RateBasedStreamValve or something like that :)

To solve this, my idea is to add a counter/timer in the RichCoFlatMapFunction that counts how many tuples have been processed from A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the buffer. However, this would make my RichCoFlatMapFunction much bigger and would not allow for operator reuse in other scenarios.

I'm of course happy to answer if something is unclear.

-- Jonas

Reply | Threaded
Open this post in threaded view
|

Re: Start streaming tuples depending on another streams rate

Tzu-Li (Gordon) Tai
Hi Jonas,

A few things to clarify first:

Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s.

From this description it seems like the job is re-reading from the beginning from the topic, and once you reach the latest record at the head of the queue, you start getting the normal input rate again, correct?

What I now want is that while tuples from A are being processed in flatMap1, the stream B in flatMap2 should wait until the rate of the A stream has dropped and only then, be flatMap2 should be called.

So what you are looking for is that flatMap2 for stream B only doing work after the job reaches the latest record in stream A?

If that’s the case, I would not rely on determining a drop on the threshold rate value. It isn’t reliable because it’s dependent on stream A’s actual input rate, which naturally as a stream changes over time.

I’m not sure if it’s the best solution, but this is what I have in mind:
You could perhaps insert a special marker event into stream A every time you start running this job.
Your job can have an operator before your co-flatMap operator that expects this special marker, and when it receives it (which is when the head of stream A is reached),  broadcasts a special event to the co-flatMap for flatMap2 to be processed.
Then, once flatMap2 is invoked with the special event, you can toggle logic in flatMap2 to actually start doing stuff.

Cheers,
Gordon

On February 9, 2017 at 8:09:33 PM, Jonas ([hidden email]) wrote:

Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B.
A
    .connect(B)
    .keyBy(_.id, _.id)
    .flatMap(new MyOp)
In MyOp, the A stream tuples are combined to form a state using a ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s. A big drop. What I now want is that while tuples from A are being processed in flatMap1, the stream B in flatMap2 should wait until the rate of the A stream has dropped and only then, be flatMap2 should be called. Ideally, this behaviour would be captured in a separate operator, like RateBasedStreamValve or something like that :) To solve this, my idea is to add a counter/timer in the RichCoFlatMapFunction that counts how many tuples have been processed from A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the buffer. However, this would make my RichCoFlatMapFunction much bigger and would not allow for operator reuse in other scenarios. I'm of course happy to answer if something is unclear. -- Jonas

View this message in context: Start streaming tuples depending on another streams rate
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Start streaming tuples depending on another streams rate

Jonas Gröger
Tzu-Li (Gordon) Tai wrote
Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s.
Absolutely correct.
Tzu-Li (Gordon) Tai wrote
So what you are looking for is that flatMap2 for stream B only doing work after the job reaches the latest record in stream A?
Very much so.
Tzu-Li (Gordon) Tai wrote
You could perhaps insert a special marker event into stream A every time you start running this job. Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s.
I tried using stream punctuations but it is hard to know which one is the "last" punctuation, since after some time I might have mutliple in there.

Imagine stream A has in total about 100M messages. We insert a Punctuation as message number 100.000.001. Works.

Next week we need to start the job again. Stream A now has 110M messages and 2 punctuation marks. One at the 100.000.001 and one at 110.000.001.

I cannot decide which one is the latest while processing the the stream.

Tzu-Li (Gordon) Tai wrote
Then, once flatMap2 is invoked with the special event, you can toggle logic in flatMap2 to actually start doing stuff.
This has the issue that while stream A is being processed, I lose tuples from stream B because it is not "stopped". I think my use case is currently not really doable in Flink. -- Jonas
Reply | Threaded
Open this post in threaded view
|

Re: Start streaming tuples depending on another streams rate

Aljoscha Krettek
Hi,
I think there are two (somewhat) orthogonal problems here:

1) Determining when a stream of input data switches from the "reading old data" to the "reading current data"  phase.

2) Blocking/buffering one input of an operator depending on some condition on the other input.

I think 1. can only be solved by user code. 2) is quite hard: we would either need to buffer elements of the blocked input and only start processing once we are "allowed to" this can blow up very quickly and we have to checkpoint those buffered elements or we would have to block receiving data on that input which is not possible right now because this can potentially lead to deadlocks in the topology.

I'm afraid there is no good solution for this right now.

Cheers,
Aljoscha

On Fri, 10 Feb 2017 at 09:26 Jonas <[hidden email]> wrote:
Tzu-Li (Gordon) Tai wrote
Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s.
Absolutely correct.
Tzu-Li (Gordon) Tai wrote
So what you are looking for is that flatMap2 for stream B only doing work after the job reaches the latest record in stream A?
Very much so.
Tzu-Li (Gordon) Tai wrote
You could perhaps insert a special marker event into stream A every time you start running this job. Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s.
I tried using stream punctuations but it is hard to know which one is the "last" punctuation, since after some time I might have mutliple in there.

Imagine stream A has in total about 100M messages. We insert a Punctuation as message number 100.000.001. Works.

Next week we need to start the job again. Stream A now has 110M messages and 2 punctuation marks. One at the 100.000.001 and one at 110.000.001.

I cannot decide which one is the latest while processing the the stream.
Tzu-Li (Gordon) Tai wrote
Then, once flatMap2 is invoked with the special event, you can toggle logic in flatMap2 to actually start doing stuff.
This has the issue that while stream A is being processed, I lose tuples from stream B because it is not "stopped". I think my use case is currently not really doable in Flink. -- Jonas

View this message in context: Re: Start streaming tuples depending on another streams rate
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Start streaming tuples depending on another streams rate

Jonas Gröger
For 2: You can also NOT read the Source (i.e. Kafka) while doing that. This way you don't have to buffer.
Reply | Threaded
Open this post in threaded view
|

Re: Start streaming tuples depending on another streams rate

Aljoscha Krettek
Yes, but that information would have to "bubble" up from the downstream operator to the source, which is not possible right now.

On Sun, 12 Feb 2017 at 17:15 Jonas <[hidden email]> wrote:
For 2: You can also NOT read the Source (i.e. Kafka) while doing that. This
way you don't have to buffer.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542p11590.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.