Hi,
we're considering flink for a couple of our projects. I'm doing a trial implementation for one of them. So far, I like a lot of things, however there are a couple of issues that I can't figure out how to resolve. Not sure if it's me misunderstanding the tool, or flink just doesn't have a capability to do it. We want to do an event time join on two big kafka streams. Both of them might experience some issues on the other end and be delayed. Additionally, while both are big, one (let's call it stream A) is significantly larger than stream B. We also know, that the join window is around 5min. That is, given some key K in stream B, if there is a counterpart in stream A, it's going to be +/5 5min in event time. Since stream A is especially heavy and it's unfeasable to keep hours of it in memory, I would imagine an ideal solution where we read both streams from Kafka. We always make sure that stream B is ahead by 10min, that is, if stream A is currently ahead in watermarks, we stall it and consume stream B until it catches up. Once the stream are alligned in event time (with the 10min delay window) we run them both through join. The problem is, that I find a mechanism to implement that in flink. If I try to do a CoProcessFunction then it just consumes both streams at the same time, ingests a lot of messages from stream A, runs out of memory and dies. Any ideas on how this could be solved? (here's a thread with a very similar problem from some time ago http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html) Regards, Gytis |
Hi Gytis, Flink does currently not support holding back individual streams, for example it is not possible to align streams on (offset) event-time.2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>: Hi, |
This is very interesting. I would imagine that there will be high back pressure on the LEFT source effectively throttling it but as is the current state that is likely effect other pipelines as the free o/p buffer on the source side and and i/p buffers on the consumer side start blocking and get exhausted for all other pipes. I am very interested in how holding back the busy source does not create a pathological issue where that source is forever held back. Is there a FLIP for it ? On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]> wrote:
|
Aah we have it here https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6 On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <[hidden email]> wrote:
|
The join would not cause backpressure but rather put all events that cannot be processed yet into state to process them later. So this works well if the data that is provided by the streams is roughly aligned by event time.2018-03-08 9:04 GMT-08:00 Vishal Santoshi <[hidden email]>:
|
Yep. I think this leads to this general question and may be not pertinent to https://github.com/apache/ On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <[hidden email]> wrote:
|
Thanks for the answers and discussion both of you.
The FLIP mentions that the cases where one stream is much faster than the other one, will not be handled for now either, so I guess it would still not solve our problems. As for the join semantics itself, I think we achieve the same thing with CoProcessFunction, unless I'm missing something. Anyway, one couple more questions then. It seems weird that this issue isn't much more talked about or prioritized. That leads me to believe that maybe we're misunderstanding the use case for flink, or maybe other users have a different architecture / environment that doesn't present them with such problems. Could you describe how it is usually used? From the documentation and talks it looks like fault tolerance is an important concept in flink, so a source pausing, or slowing down is expected. The way I see it, the only options to deal with it at the moment: 1) have a cluster size that can buffer everything for as long as needed and is able to eventually catch up 2) model the behaviour so that the streams that are ahead, can go through after some cutoff time do most of the applications just fall into one of these behaviours? Finally, are there some ideas about extending capabilities of the backpressure mechanism that would allow of building some sort of functionality, similar to what I was describing in the initial mail. With some prioritisation to one of the streams, or other custom stalling behaviour. (maybe this credit based approach Vishal mentions? The FLIP document is not public, so can't really tell) Thanks again for all the help! Gytis On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi <[hidden email]> wrote: > Yep. I think this leads to this general question and may be not pertinent > to https://github.com/apache/flink/pull/5342. How do we throttle a source > if the held back data gets unreasonably large ? I know that that is in > itself a broader question but delayed watermarks of slow stream accentuates > the issue . I am curious to know how credit based back pressure handling > plays or is that outside the realm of this discussion ? And is credit based > back pressure handling in 1.5 release. > > On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <[hidden email]> wrote: >> >> The join would not cause backpressure but rather put all events that >> cannot be processed yet into state to process them later. >> So this works well if the data that is provided by the streams is roughly >> aligned by event time. >> >> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <[hidden email]>: >>> >>> Aah we have it here >>> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6 >>> >>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi >>> <[hidden email]> wrote: >>>> >>>> This is very interesting. I would imagine that there will be high back >>>> pressure on the LEFT source effectively throttling it but as is the current >>>> state that is likely effect other pipelines as the free o/p buffer on the >>>> source side and and i/p buffers on the consumer side start blocking and get >>>> exhausted for all other pipes. I am very interested in how holding back the >>>> busy source does not create a pathological issue where that source is >>>> forever held back. Is there a FLIP for it ? >>>> >>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <[hidden email]> >>>> wrote: >>>>> >>>>> Hi Gytis, >>>>> >>>>> Flink does currently not support holding back individual streams, for >>>>> example it is not possible to align streams on (offset) event-time. >>>>> >>>>> However, the Flink community is working on a windowed join for the >>>>> DataStream API, that only holds the relevant tail of the stream as state. >>>>> If your join condition is +/- 5 minutes then, the join would store he >>>>> last five minutes of both streams as state. Here's an implementation of the >>>>> operator [1] that is close to be merged and will be available in Flink >>>>> 1.6.0. >>>>> Flink's SQL support (and Table API) support this join type since >>>>> version 1.4.0 [2]. >>>>> >>>>> Best, Fabian >>>>> >>>>> [1] https://github.com/apache/flink/pull/5342 >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins >>>>> >>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <[hidden email]>: >>>>>> >>>>>> Hi, >>>>>> >>>>>> we're considering flink for a couple of our projects. I'm doing a >>>>>> trial implementation for one of them. So far, I like a lot of things, >>>>>> however there are a couple of issues that I can't figure out how to >>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink just >>>>>> doesn't have a capability to do it. >>>>>> >>>>>> We want to do an event time join on two big kafka streams. Both of >>>>>> them might experience some issues on the other end and be delayed. >>>>>> Additionally, while both are big, one (let's call it stream A) is >>>>>> significantly larger than stream B. >>>>>> >>>>>> We also know, that the join window is around 5min. That is, given some >>>>>> key K in stream B, if there is a counterpart in stream A, it's going >>>>>> to be +/5 5min in event time. >>>>>> >>>>>> Since stream A is especially heavy and it's unfeasable to keep hours >>>>>> of it in memory, I would imagine an ideal solution where we read both >>>>>> streams from Kafka. We always make sure that stream B is ahead by >>>>>> 10min, that is, if stream A is currently ahead in watermarks, we stall >>>>>> it and consume stream B until it catches up. Once the stream are >>>>>> alligned in event time (with the 10min delay window) we run them both >>>>>> through join. >>>>>> >>>>>> The problem is, that I find a mechanism to implement that in flink. If >>>>>> I try to do a CoProcessFunction then it just consumes both streams at >>>>>> the same time, ingests a lot of messages from stream A, runs out of >>>>>> memory and dies. >>>>>> >>>>>> Any ideas on how this could be solved? >>>>>> >>>>>> (here's a thread with a very similar problem from some time ago >>>>>> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html) >>>>>> >>>>>> Regards, >>>>>> Gytis >>>>> >>>>> >>>> >>> >> > |
Hi, A Flink application does not have a problem if it ingests two streams with very different throughput as long as they are somewhat synced on their event-time. This is typically the case when ingesting real-time data. In such scenarios, an application would not buffer more data than necessary. When reading two streams of historic data with different "density" (events per time interval) or real-time streams that are off by some time interval, the application needs to buffer more data to compensate for the difference in time. In case of real-time streams that are off by a (more or less) fixed offset, you should plan for the additional state requirements. Syncing sources to the same event-time would help in both cases. However, Flink's RocksDB state backend is also pretty good in handling very large state sizes due to asynchronous and incremental checkpointing. The window join functions of the SQL and Table API are implemented using a CoProcessFunction and so is the new join operator that I pointed to. Syncing sources is not really related to fault tolerance except that additional state affects the checkpointing and recovery performance. Pausing sources can cause problems because watermarks do not advance when no data is ingested, but again this is not related to fault tolerance. It only applies to cases where an operator cannot continue processing, for example if the function call does not return. An operator cannot decide to block a particular input and process the other one. Long story short. If you join two streams on event time, you need to buffer the data for the join window + the event time difference between both streams. Best, Fabian 2018-03-09 9:28 GMT+01:00 Gytis Žilinskas <[hidden email]>: Thanks for the answers and discussion both of you. |
Free forum by Nabble | Edit this page |