Hello,
I have two data streams, and want to join them using a tumbling window. Each of the streams would have at most one record per window. There is also a requirement to log/save the records that don't have a companion from the other stream. What would be the best option for my case? Would that be possible to use Flink's Join? I tried to use CoProcessFunction: truncating the timestamp of each record to the beginning of the tumbling window, and then "keyBy" the two streams using (key, truncated-timestamp). When I receive a record from one stream, if that's the first record of the pair, then I save it to a MapState. If it is the 2nd record, then I merge with the 1st one then fire. This implementation works, but (a) I feel that it's over-complicated, and (b) I have a concern that when one stream is slower than the other, my cached data would build up and make my cluster out-of-memory. Would back-pressure kicks in for this case? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, > I feel that it's over-complicated I think a Table API or SQL[1] job can also achieve what you want. Probably more simple and takes up less code. The workflow looks like: 1. union all two source tables. You may need to unify the schema of the two tables as union all can only used to union tables with the same schema. 2. perform window group by, i.e., group by tumbling window + key. 3. write an user-defined aggregate function[2] which is used to merge the data. > my cached data would build up and make my cluster out-of-memory. You can use the `RocksDBStateBackend`[3]. The amount of state that you can keep is only limited by the amount of disk space available. > Would back-pressure kicks in for this case? It seems there are no direct ways to aliment different sources now. However, the community has already discussed and trying to solve it[4]. Best, Hequn On Mon, Apr 15, 2019 at 8:08 PM Averell <[hidden email]> wrote: Hello, |
Thank you Hecheng.
I just tried to use Table API as your suggestion, and it almost worked (it worked with two issues here below): - I only get the output when my event-time watermark goes pass the end of the tumbling window. But, because I know that there are maximum 2 records per window (one from each stream), I would like to collect my output record as soon as I received two input records. With low-level-API, I believe I can do this with Trigger. Can I achieve a similar result with Table API? - In the UDAggF document, I saw a recommendation to use Java instead of Scala. Does this apply to the low-level-API functions as well? Thanks and best regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell,
I think your original solution is the right one, given your requirements. I don't think it is over complicated. As for the memory concerns, there is no bult-in mechanism for backpressure/alignment based on event time. The community did take that into consideration when discussing the new source interface though[1]. But as Hequn already mentioned if you use the RocksDBStateBackend, the amount of space is limited by the disk space. Moreover you could add some safety timer, that would fire every x minutes and clear the oldest entries. Best, Dawid [1] https://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E On 19/04/2019 05:15, Averell wrote: > Thank you Hecheng. > > I just tried to use Table API as your suggestion, and it almost worked (it > worked with two issues here below): > - I only get the output when my event-time watermark goes pass the end > of the tumbling window. But, because I know that there are maximum 2 records > per window (one from each stream), I would like to collect my output record > as soon as I received two input records. With low-level-API, I believe I can > do this with Trigger. Can I achieve a similar result with Table API? > - In the UDAggF document, I saw a recommendation to use Java instead of > Scala. Does this apply to the low-level-API functions as well? > > Thanks and best regards, > Averell > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ signature.asc (849 bytes) Download Attachment |
Hi Dawid,
I just tried to change from CoProcessFunction with onTimer() to ProcessWindowFunction with Trigger and TumblingWindow. So I can key my stream by (id) instead of (id, eventTime). With this, I can use /reinterpretAsKeyedStream/, and hope that it would give better performance. I can also use the out of the box function sideOutputLateData() Not sure whether I would really be benefited from that. Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |