Identify orphan records after joining two streams

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Identify orphan records after joining two streams

Averell
Hello,

I have two data streams, and want to join them using a tumbling window. Each
of the streams would have at most one record per window. There is also a
requirement to log/save the records that don't have a companion from the
other stream.
What would be the best option for my case? Would that be possible to use
Flink's Join?

I tried to use CoProcessFunction: truncating the timestamp of each record to
the beginning of the tumbling window, and then "keyBy" the two streams using
(key, truncated-timestamp). When I receive a record from one stream, if
that's the first record of the pair, then I save it to a MapState. If it is
the 2nd record, then I merge with the 1st one then fire.
This implementation works, but
(a) I feel that it's over-complicated, and
(b) I have a concern that when one stream is slower than the other, my
cached data would build up and make my cluster out-of-memory. Would
back-pressure kicks in for this case?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Identify orphan records after joining two streams

Hequn Cheng
Hi Averell,

> I feel that it's over-complicated
I think a Table API or SQL[1] job can also achieve what you want. Probably more simple and takes up less code.
The workflow looks like:
1. union all two source tables. You may need to unify the schema of the two tables as union all can only used to union tables with the same schema.
2. perform window group by, i.e., group by tumbling window + key.
3. write an user-defined aggregate function[2] which is used to merge the data. 

> my cached data would build up and make my cluster out-of-memory. 
You can use the `RocksDBStateBackend`[3]. The amount of state that you can keep is only limited by the amount of disk space available. 

> Would back-pressure kicks in for this case?
It seems there are no direct ways to aliment different sources now. However, the community has already discussed and trying to solve it[4]. 

Best, Hequn


On Mon, Apr 15, 2019 at 8:08 PM Averell <[hidden email]> wrote:
Hello,

I have two data streams, and want to join them using a tumbling window. Each
of the streams would have at most one record per window. There is also a
requirement to log/save the records that don't have a companion from the
other stream.
What would be the best option for my case? Would that be possible to use
Flink's Join?

I tried to use CoProcessFunction: truncating the timestamp of each record to
the beginning of the tumbling window, and then "keyBy" the two streams using
(key, truncated-timestamp). When I receive a record from one stream, if
that's the first record of the pair, then I save it to a MapState. If it is
the 2nd record, then I merge with the 1st one then fire.
This implementation works, but
(a) I feel that it's over-complicated, and
(b) I have a concern that when one stream is slower than the other, my
cached data would build up and make my cluster out-of-memory. Would
back-pressure kicks in for this case?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Identify orphan records after joining two streams

Averell
Thank you Hecheng.

I just tried to use Table API as your suggestion, and it almost worked (it
worked with two issues here below):
    - I only get the output when my event-time watermark goes pass the end
of the tumbling window. But, because I know that there are maximum 2 records
per window (one from each stream), I would like to collect my output record
as soon as I received two input records. With low-level-API, I believe I can
do this with Trigger. Can I achieve a similar result with Table API?
    - In the UDAggF document, I saw a recommendation to use Java instead of
Scala. Does this apply to the low-level-API functions as well?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Identify orphan records after joining two streams

Dawid Wysakowicz-2
Hi Averell,

I think your original solution is the right one, given your
requirements. I don't think it is over complicated.

As for the memory concerns, there is no bult-in mechanism for
backpressure/alignment based on event time. The community did take that
into consideration when discussing the new source interface though[1].
But as Hequn already mentioned if you use the RocksDBStateBackend, the
amount of space is limited by the disk space. Moreover you could add
some safety timer, that would fire every x minutes and clear the oldest
entries.

Best,

Dawid


[1]
https://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E

On 19/04/2019 05:15, Averell wrote:

> Thank you Hecheng.
>
> I just tried to use Table API as your suggestion, and it almost worked (it
> worked with two issues here below):
>     - I only get the output when my event-time watermark goes pass the end
> of the tumbling window. But, because I know that there are maximum 2 records
> per window (one from each stream), I would like to collect my output record
> as soon as I received two input records. With low-level-API, I believe I can
> do this with Trigger. Can I achieve a similar result with Table API?
>     - In the UDAggF document, I saw a recommendation to use Java instead of
> Scala. Does this apply to the low-level-API functions as well?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Identify orphan records after joining two streams

Averell
Hi Dawid,

I just tried to change from CoProcessFunction with onTimer() to
ProcessWindowFunction with Trigger and TumblingWindow. So I can key my
stream by (id) instead of (id, eventTime). With this, I can use
/reinterpretAsKeyedStream/, and hope that it would give better performance.
I can also use the out of the box function sideOutputLateData()
Not sure whether I would really be benefited from that.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/