synchronizing two streams

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

synchronizing two streams

Alexander Gryzlov
Hello,

We're implementing a streaming outer join operator based on a TwoInputStreamOperator with an internal buffer. In our use-case only the items whose timestamps are within a several-second interval of each other can join, so we need to synchronize the two input streams to ensure maximal yield. Our plan is to utilize the watermark mechanism to implement some sort of a "throttling" operator, which would take two streams and stop passing through one of them based on the watermarks in another. However, there doesn't seem to exist an operator of the shape (A,B)->(A,B) in Flink, where A and B can be received and emitted independently. What would be a resource-saving way to implement such (e.g., without spawning two more parallel TwoInputStreamOperators)?

Alex
Reply | Threaded
Open this post in threaded view
|

Re: synchronizing two streams

Matthias J. Sax-2
I cannot follow completely. TwoInputStreamOperators defines two methods
to process watermarks for each stream.

So you can sync both stream within your outer join operator you plan to
implement.

-Matthias

On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:

> Hello,
>
> We're implementing a streaming outer join operator based on a
> TwoInputStreamOperator with an internal buffer. In our use-case only the
> items whose timestamps are within a several-second interval of each
> other can join, so we need to synchronize the two input streams to
> ensure maximal yield. Our plan is to utilize the watermark mechanism to
> implement some sort of a "throttling" operator, which would take two
> streams and stop passing through one of them based on the watermarks in
> another. However, there doesn't seem to exist an operator of the shape
> (A,B)->(A,B) in Flink, where A and B can be received and emitted
> independently. What would be a resource-saving way to implement such
> (e.g., without spawning two more parallel TwoInputStreamOperators)?
>
> Alex


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: synchronizing two streams

Alexander Gryzlov
Hmm, probably I don't really get how Flink's execution model works. As far as I understand, the preferred way to throttle down stream consumption is to simply have an operator with a conditional Thread.sleep() inside. Wouldn't calling sleep() in either of TwoInputStreamOperator's processWatermarkN() methods just freeze the entire operator, stopping the consumption of both streams (as opposed to just one)?

Alex

On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax <[hidden email]> wrote:
I cannot follow completely. TwoInputStreamOperators defines two methods
to process watermarks for each stream.

So you can sync both stream within your outer join operator you plan to
implement.

-Matthias

On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
> Hello,
>
> We're implementing a streaming outer join operator based on a
> TwoInputStreamOperator with an internal buffer. In our use-case only the
> items whose timestamps are within a several-second interval of each
> other can join, so we need to synchronize the two input streams to
> ensure maximal yield. Our plan is to utilize the watermark mechanism to
> implement some sort of a "throttling" operator, which would take two
> streams and stop passing through one of them based on the watermarks in
> another. However, there doesn't seem to exist an operator of the shape
> (A,B)->(A,B) in Flink, where A and B can be received and emitted
> independently. What would be a resource-saving way to implement such
> (e.g., without spawning two more parallel TwoInputStreamOperators)?
>
> Alex


Reply | Threaded
Open this post in threaded view
|

Re: synchronizing two streams

Matthias J. Sax-2
That is correct. But there is no reason to throttle an input stream.

If you implements an Outer-Join you will have two in-memory buffers
holding the record of each stream of your "time window". Each time you
receive a watermark, you can remove all "expired" records from the
buffer of the other stream. Furthermore, you need to track if a record
got joined of not. For all records that got not joined, before removing
them emit a "record-null" (or "null-record") result tuple.

No need to block/sleep.

Does this make sense?


-Matthias


On 05/12/2016 02:51 PM, Alexander Gryzlov wrote:

> Hmm, probably I don't really get how Flink's execution model works. As
> far as I understand, the preferred way to throttle down stream
> consumption is to simply have an operator with a conditional
> Thread.sleep() inside. Wouldn't calling sleep() in either
> of TwoInputStreamOperator's processWatermarkN() methods just freeze the
> entire operator, stopping the consumption of both streams (as opposed to
> just one)?
>
> Alex
>
> On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I cannot follow completely. TwoInputStreamOperators defines two methods
>     to process watermarks for each stream.
>
>     So you can sync both stream within your outer join operator you plan to
>     implement.
>
>     -Matthias
>
>     On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
>     > Hello,
>     >
>     > We're implementing a streaming outer join operator based on a
>     > TwoInputStreamOperator with an internal buffer. In our use-case
>     only the
>     > items whose timestamps are within a several-second interval of each
>     > other can join, so we need to synchronize the two input streams to
>     > ensure maximal yield. Our plan is to utilize the watermark
>     mechanism to
>     > implement some sort of a "throttling" operator, which would take two
>     > streams and stop passing through one of them based on the
>     watermarks in
>     > another. However, there doesn't seem to exist an operator of the shape
>     > (A,B)->(A,B) in Flink, where A and B can be received and emitted
>     > independently. What would be a resource-saving way to implement such
>     > (e.g., without spawning two more parallel TwoInputStreamOperators)?
>     >
>     > Alex
>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: synchronizing two streams

Alexander Gryzlov
Yes, this is generally a viable design, and is actually something we started off with.

The problem in our case is, however, that either of the streams can occasionally (due to external producer's issues) get stuck for an arbitrary period of time, up to several hours. Buffering the other one during all this time would just blow the memory - streams' rates are dozens or even hundreds of Mb/sec. 

Alex

On Thu, May 12, 2016 at 4:00 PM, Matthias J. Sax <[hidden email]> wrote:
That is correct. But there is no reason to throttle an input stream.

If you implements an Outer-Join you will have two in-memory buffers
holding the record of each stream of your "time window". Each time you
receive a watermark, you can remove all "expired" records from the
buffer of the other stream. Furthermore, you need to track if a record
got joined of not. For all records that got not joined, before removing
them emit a "record-null" (or "null-record") result tuple.

No need to block/sleep.

Does this make sense?


-Matthias


On 05/12/2016 02:51 PM, Alexander Gryzlov wrote:
> Hmm, probably I don't really get how Flink's execution model works. As
> far as I understand, the preferred way to throttle down stream
> consumption is to simply have an operator with a conditional
> Thread.sleep() inside. Wouldn't calling sleep() in either
> of TwoInputStreamOperator's processWatermarkN() methods just freeze the
> entire operator, stopping the consumption of both streams (as opposed to
> just one)?
>
> Alex
>
> On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I cannot follow completely. TwoInputStreamOperators defines two methods
>     to process watermarks for each stream.
>
>     So you can sync both stream within your outer join operator you plan to
>     implement.
>
>     -Matthias
>
>     On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
>     > Hello,
>     >
>     > We're implementing a streaming outer join operator based on a
>     > TwoInputStreamOperator with an internal buffer. In our use-case
>     only the
>     > items whose timestamps are within a several-second interval of each
>     > other can join, so we need to synchronize the two input streams to
>     > ensure maximal yield. Our plan is to utilize the watermark
>     mechanism to
>     > implement some sort of a "throttling" operator, which would take two
>     > streams and stop passing through one of them based on the
>     watermarks in
>     > another. However, there doesn't seem to exist an operator of the shape
>     > (A,B)->(A,B) in Flink, where A and B can be received and emitted
>     > independently. What would be a resource-saving way to implement such
>     > (e.g., without spawning two more parallel TwoInputStreamOperators)?
>     >
>     > Alex
>
>


Reply | Threaded
Open this post in threaded view
|

Re: synchronizing two streams

Matthias J. Sax-2
I see. But even if you would have an operator (A,B)->(A,B), it would not
be possible to block A if B does not deliver any data, because of
Flink's internal design.

You will need to use an custom solution: something like to a map (one
for each steam) that use an side-communication channel (ie, external to
Flink). The maps could send heart-beats to each other as long as there
are input date available. As long as heart beats are received, data is
forwarded. If there are no heart beats for the other map, it indicates
that the other stream lacks data and thus forwarding can block to
throttle the own stream.

-Matthias


On 05/12/2016 03:36 PM, Alexander Gryzlov wrote:

> Yes, this is generally a viable design, and is actually something we
> started off with.
>
> The problem in our case is, however, that either of the streams can
> occasionally (due to external producer's issues) get stuck for an
> arbitrary period of time, up to several hours. Buffering the other one
> during all this time would just blow the memory - streams' rates are
> dozens or even hundreds of Mb/sec.
>
> Alex
>
> On Thu, May 12, 2016 at 4:00 PM, Matthias J. Sax <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     That is correct. But there is no reason to throttle an input stream.
>
>     If you implements an Outer-Join you will have two in-memory buffers
>     holding the record of each stream of your "time window". Each time you
>     receive a watermark, you can remove all "expired" records from the
>     buffer of the other stream. Furthermore, you need to track if a record
>     got joined of not. For all records that got not joined, before removing
>     them emit a "record-null" (or "null-record") result tuple.
>
>     No need to block/sleep.
>
>     Does this make sense?
>
>
>     -Matthias
>
>
>     On 05/12/2016 02:51 PM, Alexander Gryzlov wrote:
>     > Hmm, probably I don't really get how Flink's execution model works. As
>     > far as I understand, the preferred way to throttle down stream
>     > consumption is to simply have an operator with a conditional
>     > Thread.sleep() inside. Wouldn't calling sleep() in either
>     > of TwoInputStreamOperator's processWatermarkN() methods just freeze the
>     > entire operator, stopping the consumption of both streams (as opposed to
>     > just one)?
>     >
>     > Alex
>     >
>     > On Thu, May 12, 2016 at 2:31 PM, Matthias J. Sax <[hidden email] <mailto:[hidden email]>
>     > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>     >
>     >     I cannot follow completely. TwoInputStreamOperators defines
>     two methods
>     >     to process watermarks for each stream.
>     >
>     >     So you can sync both stream within your outer join operator
>     you plan to
>     >     implement.
>     >
>     >     -Matthias
>     >
>     >     On 05/11/2016 05:00 PM, Alexander Gryzlov wrote:
>     >     > Hello,
>     >     >
>     >     > We're implementing a streaming outer join operator based on a
>     >     > TwoInputStreamOperator with an internal buffer. In our use-case
>     >     only the
>     >     > items whose timestamps are within a several-second interval
>     of each
>     >     > other can join, so we need to synchronize the two input
>     streams to
>     >     > ensure maximal yield. Our plan is to utilize the watermark
>     >     mechanism to
>     >     > implement some sort of a "throttling" operator, which would
>     take two
>     >     > streams and stop passing through one of them based on the
>     >     watermarks in
>     >     > another. However, there doesn't seem to exist an operator of
>     the shape
>     >     > (A,B)->(A,B) in Flink, where A and B can be received and emitted
>     >     > independently. What would be a resource-saving way to
>     implement such
>     >     > (e.g., without spawning two more parallel
>     TwoInputStreamOperators)?
>     >     >
>     >     > Alex
>     >
>     >
>
>


signature.asc (836 bytes) Download Attachment