I have been playing around with Flink for a few weeks to try to
ascertain whether or not it meets our use cases, and also what best practices we should be following. I have a few questions I would appreciate answers to. Our scenario is that we want to process a lot of event data into cases. A case is an inorder sequence of events; this event data could be quite old. We never know when a case is complete, so we just want to have the most up to date picture of what a case looks like. The inorder sequence of events of a case is called the trace. Many cases could have an identical trace. We would like to construct these traces, and do some aggregations on those (case count, average/min/max life-cycle time). We then have further downstream processing we will do on a case, some of which would require additional inputs, either from side-inputs of somehow joining data sources. We don’t really care about event time at the moment, because we just want to build cases and traces with all the data we have received. The end results should be available for our web front end via rest api. Based on the above I have the following idea for a first implementation: Kafka source -> key by case id -> session window with rocks db state backend holding case for that key -> postgres sink The reason for a session window is that, as I mentioned above, we just want to build a group with all the data we have received into kafka up until that point in time. We would experiment with what this gap time should be, and in future it might be specific to the type of group, but for the start a naive approach is acceptable. I think this could be better than just doing it, say, every 10 minutes because we really don’t know yet the frequency of the data received. Also, some inputs to kafka come directly from a CSV upload, so we will get “firehose” periods, and periods of nothing. In short: I think what we have closely matches session behaviour. We also have to implement a postgres sink that is capable of doing upserts. The reason for postgres is to service the rest front end. We then have to build our traces and can see two options for it: 1) The most obvious solution would be to use a kafka sink for the keyed case stream, and to do the trace aggregations in a downstream flink job with this kafka topic as a source. However, I have some concerns over losing any data (i.e. how do we know whether or not an event has been successfully pushed into the kafka stream). 2) Another approach might be to use some other type of sink (perhaps postgres), and to use this as a source for the traces job. This would help us guarantee data consistency. 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so: Keyed cases stream -> broadcast -> key by tracehash with rocks db state backend holding trace for that tracehash -> perform aggregrations -> postgres sink Is broadcast an option here? How costly is it? Which of these approaches (or any other), would you recommend? ------------------------------------- Another question regarding the state: As we never know when a case is complete this means that the rocksdb backend could grow infinitely (!). Obviously we would need to get a bit smarter here. Is using state in this way a somewhat standard practice, or is state intended more for recovery? Managing growing state: I found some discussion regarding how to clear state here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402 which references https://issues.apache.org/jira/browse/FLINK-3946 Thanks, Kat |
Hi Kat, I did not understand the difference between a case and a trace. If a case is the collection of all events (which I assume) what is the difference to a trace which is also the list of events (if I got it right)? 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <[hidden email]>: I have been playing around with Flink for a few weeks to try to |
On thing to add: the Flink KafkaProducer provides only at-least-once if flush-on-checkpoint is enabled [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean- 2017-01-13 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
|
Hi Fabian,
A case consists of all events sharing the same case id. This id is what we initially key the stream by. The order of these events is the trace. For example, caseid: case1, consisting of event1, event2, event3. Start time 11:00, end 11:05, run time 5 minutes caseid: case12, consisting of event1, event2, event3 Start time 11:00, end 11:15, run time 15 minutes These are 2 distinct cases, with the same trace (event1, event2, event3). This trace would have 2 occurrences with a min run time of 5 minutes, max 15 and average 10. I have implemented your 2nd suggestion for the first job, I hope I have made the Traces clearer as I am still unsure of the best approach here. Thanks a lot, Kat On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske <[hidden email]> wrote: > On thing to add: the Flink KafkaProducer provides only at-least-once if > flush-on-checkpoint is enabled [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean- > > 2017-01-13 22:02 GMT+01:00 Fabian Hueske <[hidden email]>: >> >> Hi Kat, >> >> I did not understand the difference between a case and a trace. >> If I got it right, the goal of your first job is to assemble the >> individual events into cases. Is a case here the last event for a case-id or >> all events of a case-id? >> If a case is the collection of all events (which I assume) what is the >> difference to a trace which is also the list of events (if I got it right)? >> >> In any case, I think your first job can also be solved without a session >> window (which is quite complex internally). >> There are two options: >> 1) use a global window [1] with a custom trigger that triggers for each >> arriving record. A global window does never end, which would be OK since >> your cases do not end as well. >> 2) use a MapFunction with key-partitioned operator state [2]. The map >> function would simply update the state for every new event and emit a new >> result. >> >> Regarding your concerns of losing data when writing to Kafka. Flink's >> KafkaProducer provides at-least-once guarantees, which means that data might >> be written more than once in case of a failure but won't be lost. If the >> Kafka topic is partitioned by case-id and you only need the last record per >> case-id, Kafka's log compaction should give you upsert semantics. >> >> Regarding your question "Is using state in this way a somewhat standard >> practice, or is state intended more for recovery?": >> Many streaming applications require state for their semantics (just like >> yours), i.e., they need to buffer data and wait for more data to arrive. In >> order to guarantee consistent result semantics of an application, the state >> must not be lost and be recovered in case of a failure. So state is not >> intended for recovery, but recovery is needed to guarantee application >> semantics. >> >> As I said before, I did not get the difference between cases and trace, so >> I cannot really comment on the job to analyze traces. >> >> Hope this helps, >> Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#global-windows >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface >> >> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <[hidden email]>: >>> >>> I have been playing around with Flink for a few weeks to try to >>> ascertain whether or not it meets our use cases, and also what best >>> practices we should be following. I have a few questions I would >>> appreciate answers to. >>> >>> >>> Our scenario is that we want to process a lot of event data into >>> cases. A case is an inorder sequence of events; this event data could >>> be quite old. We never know when a case is complete, so we just want >>> to have the most up to date picture of what a case looks like. >>> >>> >>> The inorder sequence of events of a case is called the trace. Many >>> cases could have an identical trace. We would like to construct these >>> traces, and do some aggregations on those (case count, average/min/max >>> life-cycle time). >>> >>> >>> We then have further downstream processing we will do on a case, some >>> of which would require additional inputs, either from side-inputs of >>> somehow joining data sources. >>> >>> >>> We don’t really care about event time at the moment, because we just >>> want to build cases and traces with all the data we have received. >>> >>> >>> The end results should be available for our web front end via rest api. >>> >>> >>> Based on the above I have the following idea for a first implementation: >>> >>> >>> Kafka source -> key by case id -> session window with rocks db state >>> backend holding case for that key -> postgres sink >>> >>> >>> The reason for a session window is that, as I mentioned above, we just >>> want to build a group with all the data we have received into kafka up >>> until that point in time. We would experiment with what this gap time >>> should be, and in future it might be specific to the type of group, >>> but for the start a naive approach is acceptable. I think this could >>> be better than just doing it, say, every 10 minutes because we really >>> don’t know yet the frequency of the data received. Also, some inputs >>> to kafka come directly from a CSV upload, so we will get “firehose” >>> periods, and periods of nothing. >>> >>> In short: I think what we have closely matches session behaviour. >>> >>> >>> We also have to implement a postgres sink that is capable of doing >>> upserts. The reason for postgres is to service the rest front end. >>> >>> >>> We then have to build our traces and can see two options for it: >>> >>> >>> 1) The most obvious solution would be to use a kafka sink for the >>> keyed case stream, and to do the trace aggregations in a downstream >>> flink job with this kafka topic as a source. However, I have some >>> concerns over losing any data (i.e. how do we know whether or not an >>> event has been successfully pushed into the kafka stream). >>> >>> >>> 2) Another approach might be to use some other type of sink (perhaps >>> postgres), and to use this as a source for the traces job. This would >>> help us guarantee data consistency. >>> >>> >>> 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), >>> so: >>> >>> Keyed cases stream -> broadcast -> key by tracehash with rocks db >>> state backend holding trace for that tracehash -> perform >>> aggregrations -> postgres sink >>> >>> Is broadcast an option here? How costly is it? >>> >>> >>> Which of these approaches (or any other), would you recommend? >>> >>> >>> ------------------------------------- >>> >>> Another question regarding the state: >>> >>> As we never know when a case is complete this means that the rocksdb >>> backend could grow infinitely (!). Obviously we would need to get a >>> bit smarter here. >>> >>> >>> Is using state in this way a somewhat standard practice, or is state >>> intended more for recovery? >>> >>> Managing growing state: I found some discussion regarding how to clear >>> state here >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402 >>> which references https://issues.apache.org/jira/browse/FLINK-3946 >>> >>> Thanks, >>> >>> Kat >> >> > -- _______________________________________________________ Besuchen Sie uns / Meet us: OPEX-Woche DACH: January 17-19, Wiesbaden, Germany OPEX Week World Summit: January 23-27, Orlando, USA CIO EDGE Experience: Febuary 22-23, Melbourne, Australia Business Transformation and Operational Excellence World Summit: March 21-24, Orlando, USA Folgen Sie uns / Follow us: Twitter | Facebook | LinkedIn | Xing | Youtube ________________________________________________________ HRB 121584 B Amtsgericht Charlottenburg, Ust-ID: DE265675123 Geschäftsführer: Dr. Gero Decker, Guido Sachs |
Hi Kat, thanks for the clarification about cases and traces.A case (id: 1, event-1, event-3) would result in a trace (event-1, event-3) and go into the corresponding aggregates (e.g., increment a count by 1). You might want to write the result into some kind of datastore with a primary key (trace-hash) to be able to update the results in place (updates cause by retraction and update trace records). This could be a relational DB (Postres) or a compacted Kafka topic. Hope this helps. Best, Fabian 2017-01-16 9:49 GMT+01:00 Kathleen Sharp <[hidden email]>: Hi Fabian, |
Free forum by Nabble | Edit this page |