Event time join

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Event time join

Gytis Žilinskas
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis
Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Fabian Hueske-2
Hi Gytis,

Flink does currently not support holding back individual streams, for example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last five minutes of both streams as state. Here's an implementation of the operator [1] that is close to be merged and will be available in Flink 1.6.0.
Flink's SQL support (and Table API) support this join type since version 1.4.0 [2].

Best, Fabian

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis

Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Vishal Santoshi
This is very interesting.  I would imagine that there will be high back pressure on the LEFT source effectively throttling it but as is the current state that is likely effect other pipelines as the free o/p buffer on the source side and and i/p buffers on the consumer side start blocking and get exhausted for all other pipes. I am very interested in how holding back the busy source does not create a pathological  issue where that source is forever held back. Is there a FLIP for it ? 

On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gytis,

Flink does currently not support holding back individual streams, for example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last five minutes of both streams as state. Here's an implementation of the operator [1] that is close to be merged and will be available in Flink 1.6.0.
Flink's SQL support (and Table API) support this join type since version 1.4.0 [2].

Best, Fabian

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis


Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Vishal Santoshi

On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <[hidden email]> wrote:
This is very interesting.  I would imagine that there will be high back pressure on the LEFT source effectively throttling it but as is the current state that is likely effect other pipelines as the free o/p buffer on the source side and and i/p buffers on the consumer side start blocking and get exhausted for all other pipes. I am very interested in how holding back the busy source does not create a pathological  issue where that source is forever held back. Is there a FLIP for it ? 

On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gytis,

Flink does currently not support holding back individual streams, for example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last five minutes of both streams as state. Here's an implementation of the operator [1] that is close to be merged and will be available in Flink 1.6.0.
Flink's SQL support (and Table API) support this join type since version 1.4.0 [2].

Best, Fabian

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis



Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Fabian Hueske-2
The join would not cause backpressure but rather put all events that cannot be processed yet into state to process them later.
So this works well if the data that is provided by the streams is roughly aligned by event time.

2018-03-08 9:04 GMT-08:00 Vishal Santoshi <[hidden email]>:

On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <[hidden email]> wrote:
This is very interesting.  I would imagine that there will be high back pressure on the LEFT source effectively throttling it but as is the current state that is likely effect other pipelines as the free o/p buffer on the source side and and i/p buffers on the consumer side start blocking and get exhausted for all other pipes. I am very interested in how holding back the busy source does not create a pathological  issue where that source is forever held back. Is there a FLIP for it ? 

On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gytis,

Flink does currently not support holding back individual streams, for example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last five minutes of both streams as state. Here's an implementation of the operator [1] that is close to be merged and will be available in Flink 1.6.0.
Flink's SQL support (and Table API) support this join type since version 1.4.0 [2].

Best, Fabian

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis




Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Vishal Santoshi
Yep.  I think this leads to this general question and may be not pertinent to https://github.com/apache/flink/pull/5342.  How do we throttle a source if the held back data gets unreasonably large ? I know that that is in itself a broader question but delayed watermarks of slow stream accentuates the issue . I am curious to know how credit based back pressure handling plays or is that outside the realm of this discussion ? And is credit based back pressure handling in 1.5 release.

On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <[hidden email]> wrote:
The join would not cause backpressure but rather put all events that cannot be processed yet into state to process them later.
So this works well if the data that is provided by the streams is roughly aligned by event time.

2018-03-08 9:04 GMT-08:00 Vishal Santoshi <[hidden email]>:

On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <[hidden email]> wrote:
This is very interesting.  I would imagine that there will be high back pressure on the LEFT source effectively throttling it but as is the current state that is likely effect other pipelines as the free o/p buffer on the source side and and i/p buffers on the consumer side start blocking and get exhausted for all other pipes. I am very interested in how holding back the busy source does not create a pathological  issue where that source is forever held back. Is there a FLIP for it ? 

On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]> wrote:
Hi Gytis,

Flink does currently not support holding back individual streams, for example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last five minutes of both streams as state. Here's an implementation of the operator [1] that is close to be merged and will be available in Flink 1.6.0.
Flink's SQL support (and Table API) support this join type since version 1.4.0 [2].

Best, Fabian

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis





Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Gytis Žilinskas
Thanks for the answers and discussion both of you.

The FLIP mentions that the cases where one stream is much faster than
the other one, will not be handled for now either, so I guess it would
still not solve our problems. As for the join semantics itself, I
think we achieve the same thing with CoProcessFunction, unless I'm
missing something.

Anyway, one couple more questions then. It seems weird that this issue
isn't much more talked about or prioritized. That leads me to believe
that maybe we're misunderstanding the use case for flink, or maybe
other users have a different architecture / environment that doesn't
present them with such problems. Could you describe how it is usually
used?

From the documentation and talks it looks like fault tolerance is an
important concept in flink, so a source pausing, or slowing down is
expected. The way I see it, the only options to deal with it at the
moment:

1) have a cluster size that can buffer everything for as long as
needed and is able to eventually catch up
2) model the behaviour so that the streams that are ahead, can go
through after some cutoff time

do most of the applications just fall into one of these behaviours?

Finally, are there some ideas about extending capabilities of the
backpressure mechanism that would allow of building some sort of
functionality, similar to what I was describing in the initial mail.
With some prioritisation to one of the streams, or other custom
stalling behaviour. (maybe this credit based approach Vishal mentions?
The FLIP document is not public, so can't really tell)


Thanks again for all the help!
Gytis

On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi
<[hidden email]> wrote:

> Yep.  I think this leads to this general question and may be not pertinent
> to https://github.com/apache/flink/pull/5342.  How do we throttle a source
> if the held back data gets unreasonably large ? I know that that is in
> itself a broader question but delayed watermarks of slow stream accentuates
> the issue . I am curious to know how credit based back pressure handling
> plays or is that outside the realm of this discussion ? And is credit based
> back pressure handling in 1.5 release.
>
> On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <[hidden email]> wrote:
>>
>> The join would not cause backpressure but rather put all events that
>> cannot be processed yet into state to process them later.
>> So this works well if the data that is provided by the streams is roughly
>> aligned by event time.
>>
>> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <[hidden email]>:
>>>
>>> Aah we have it here
>>> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
>>>
>>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi
>>> <[hidden email]> wrote:
>>>>
>>>> This is very interesting.  I would imagine that there will be high back
>>>> pressure on the LEFT source effectively throttling it but as is the current
>>>> state that is likely effect other pipelines as the free o/p buffer on the
>>>> source side and and i/p buffers on the consumer side start blocking and get
>>>> exhausted for all other pipes. I am very interested in how holding back the
>>>> busy source does not create a pathological  issue where that source is
>>>> forever held back. Is there a FLIP for it ?
>>>>
>>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi Gytis,
>>>>>
>>>>> Flink does currently not support holding back individual streams, for
>>>>> example it is not possible to align streams on (offset) event-time.
>>>>>
>>>>> However, the Flink community is working on a windowed join for the
>>>>> DataStream API, that only holds the relevant tail of the stream as state.
>>>>> If your join condition is +/- 5 minutes then, the join would store he
>>>>> last five minutes of both streams as state. Here's an implementation of the
>>>>> operator [1] that is close to be merged and will be available in Flink
>>>>> 1.6.0.
>>>>> Flink's SQL support (and Table API) support this join type since
>>>>> version 1.4.0 [2].
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/5342
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins
>>>>>
>>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> we're considering flink for a couple of our projects. I'm doing a
>>>>>> trial implementation for one of them. So far, I like a lot of things,
>>>>>> however there are a couple of issues that I can't figure out how to
>>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>>>>> doesn't have a capability to do it.
>>>>>>
>>>>>> We want to do an event time join on two big kafka streams. Both of
>>>>>> them might experience some issues on the other end and be delayed.
>>>>>> Additionally, while both are big, one (let's call it stream A) is
>>>>>> significantly larger than stream B.
>>>>>>
>>>>>> We also know, that the join window is around 5min. That is, given some
>>>>>> key K in stream B, if there is a counterpart in stream A, it's going
>>>>>> to be +/5 5min in event time.
>>>>>>
>>>>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>>>>> of it in memory, I would imagine an ideal solution where we read both
>>>>>> streams from Kafka. We always make sure that stream B is ahead by
>>>>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>>>>> it and consume stream B until it catches up. Once the stream are
>>>>>> alligned in event time (with the 10min delay window) we run them both
>>>>>> through join.
>>>>>>
>>>>>> The problem is, that I find a mechanism to implement that in flink. If
>>>>>> I try to do a CoProcessFunction then it just consumes both streams at
>>>>>> the same time, ingests a lot of messages from stream A, runs out of
>>>>>> memory and dies.
>>>>>>
>>>>>> Any ideas on how this could be solved?
>>>>>>
>>>>>> (here's a thread with a very similar problem from some time ago
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)
>>>>>>
>>>>>> Regards,
>>>>>> Gytis
>>>>>
>>>>>
>>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Event time join

Fabian Hueske-2
Hi,

A Flink application does not have a problem if it ingests two streams with very different throughput as long as they are somewhat synced on their event-time.
This is typically the case when ingesting real-time data. In such scenarios, an application would not buffer more data than necessary.

When reading two streams of historic data with different "density" (events per time interval) or real-time streams that are off by some time interval, the application needs to buffer more data to compensate for the difference in time.
In case of real-time streams that are off by a (more or less) fixed offset, you should plan for the additional state requirements. Syncing sources to the same event-time would help in both cases.
However, Flink's RocksDB state backend is also pretty good in handling very large state sizes due to asynchronous and incremental checkpointing.

The window join functions of the SQL and Table API are implemented using a CoProcessFunction and so is the new join operator that I pointed to.

Syncing sources is not really related to fault tolerance except that additional state affects the checkpointing and recovery performance.
Pausing sources can cause problems because watermarks do not advance when no data is ingested, but again this is not related to fault tolerance.

The credit-based network transfer will be included in Flink 1.5. However, this is not related to the question discussed here.
It only applies to cases where an operator cannot continue processing, for example if the function call does not return.
An operator cannot decide to block a particular input and process the other one.

Long story short.
If you join two streams on event time, you need to buffer the data for the join window + the event time difference between both streams.

Best, Fabian


2018-03-09 9:28 GMT+01:00 Gytis Žilinskas <[hidden email]>:
Thanks for the answers and discussion both of you.

The FLIP mentions that the cases where one stream is much faster than
the other one, will not be handled for now either, so I guess it would
still not solve our problems. As for the join semantics itself, I
think we achieve the same thing with CoProcessFunction, unless I'm
missing something.

Anyway, one couple more questions then. It seems weird that this issue
isn't much more talked about or prioritized. That leads me to believe
that maybe we're misunderstanding the use case for flink, or maybe
other users have a different architecture / environment that doesn't
present them with such problems. Could you describe how it is usually
used?

From the documentation and talks it looks like fault tolerance is an
important concept in flink, so a source pausing, or slowing down is
expected. The way I see it, the only options to deal with it at the
moment:

1) have a cluster size that can buffer everything for as long as
needed and is able to eventually catch up
2) model the behaviour so that the streams that are ahead, can go
through after some cutoff time

do most of the applications just fall into one of these behaviours?

Finally, are there some ideas about extending capabilities of the
backpressure mechanism that would allow of building some sort of
functionality, similar to what I was describing in the initial mail.
With some prioritisation to one of the streams, or other custom
stalling behaviour. (maybe this credit based approach Vishal mentions?
The FLIP document is not public, so can't really tell)


Thanks again for all the help!
Gytis

On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi
<[hidden email]> wrote:
> Yep.  I think this leads to this general question and may be not pertinent
> to https://github.com/apache/flink/pull/5342.  How do we throttle a source
> if the held back data gets unreasonably large ? I know that that is in
> itself a broader question but delayed watermarks of slow stream accentuates
> the issue . I am curious to know how credit based back pressure handling
> plays or is that outside the realm of this discussion ? And is credit based
> back pressure handling in 1.5 release.
>
> On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <[hidden email]> wrote:
>>
>> The join would not cause backpressure but rather put all events that
>> cannot be processed yet into state to process them later.
>> So this works well if the data that is provided by the streams is roughly
>> aligned by event time.
>>
>> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <[hidden email]>:
>>>
>>> Aah we have it here
>>> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
>>>
>>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi
>>> <[hidden email]> wrote:
>>>>
>>>> This is very interesting.  I would imagine that there will be high back
>>>> pressure on the LEFT source effectively throttling it but as is the current
>>>> state that is likely effect other pipelines as the free o/p buffer on the
>>>> source side and and i/p buffers on the consumer side start blocking and get
>>>> exhausted for all other pipes. I am very interested in how holding back the
>>>> busy source does not create a pathological  issue where that source is
>>>> forever held back. Is there a FLIP for it ?
>>>>
>>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi Gytis,
>>>>>
>>>>> Flink does currently not support holding back individual streams, for
>>>>> example it is not possible to align streams on (offset) event-time.
>>>>>
>>>>> However, the Flink community is working on a windowed join for the
>>>>> DataStream API, that only holds the relevant tail of the stream as state.
>>>>> If your join condition is +/- 5 minutes then, the join would store he
>>>>> last five minutes of both streams as state. Here's an implementation of the
>>>>> operator [1] that is close to be merged and will be available in Flink
>>>>> 1.6.0.
>>>>> Flink's SQL support (and Table API) support this join type since
>>>>> version 1.4.0 [2].
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/5342
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins
>>>>>
>>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> we're considering flink for a couple of our projects. I'm doing a
>>>>>> trial implementation for one of them. So far, I like a lot of things,
>>>>>> however there are a couple of issues that I can't figure out how to
>>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>>>>> doesn't have a capability to do it.
>>>>>>
>>>>>> We want to do an event time join on two big kafka streams. Both of
>>>>>> them might experience some issues on the other end and be delayed.
>>>>>> Additionally, while both are big, one (let's call it stream A) is
>>>>>> significantly larger than stream B.
>>>>>>
>>>>>> We also know, that the join window is around 5min. That is, given some
>>>>>> key K in stream B, if there is a counterpart in stream A, it's going
>>>>>> to be +/5 5min in event time.
>>>>>>
>>>>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>>>>> of it in memory, I would imagine an ideal solution where we read both
>>>>>> streams from Kafka. We always make sure that stream B is ahead by
>>>>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>>>>> it and consume stream B until it catches up. Once the stream are
>>>>>> alligned in event time (with the 10min delay window) we run them both
>>>>>> through join.
>>>>>>
>>>>>> The problem is, that I find a mechanism to implement that in flink. If
>>>>>> I try to do a CoProcessFunction then it just consumes both streams at
>>>>>> the same time, ingests a lot of messages from stream A, runs out of
>>>>>> memory and dies.
>>>>>>
>>>>>> Any ideas on how this could be solved?
>>>>>>
>>>>>> (here's a thread with a very similar problem from some time ago
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)
>>>>>>
>>>>>> Regards,
>>>>>> Gytis
>>>>>
>>>>>
>>>>
>>>
>>
>