Strategies for Complex Event Processing with guaranteed data consistency

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Strategies for Complex Event Processing with guaranteed data consistency

Kathleen Sharp
I have been playing around with Flink for a few weeks to try to
ascertain whether or not it meets our use cases, and also what best
practices we should be following. I have a few questions I would
appreciate answers to.


Our scenario is that we want to process a lot of event data into
cases. A case is an inorder sequence of events; this event data could
be quite old. We never know when a case is complete, so we just want
to have the most up to date picture of what a case looks like.


The inorder sequence of events of a case is called the trace. Many
cases could have an identical trace. We would like to construct these
traces, and do some aggregations on those (case count, average/min/max
life-cycle time).


We then have further downstream processing we will do on a case, some
of which would require additional inputs, either from side-inputs of
somehow joining data sources.


We don’t really care about event time at the moment, because we just
want to build cases and traces with all the data we have received.


The end results should be available for our web front end via rest api.


Based on the above I have the following idea for a first implementation:


Kafka source -> key by case id -> session window with rocks db state
backend holding case for that key -> postgres sink


The reason for a session window is that, as I mentioned above, we just
want to build a group with all the data we have received into kafka up
until that point in time. We would experiment with what this gap time
should be, and in future it might be specific to the type of group,
but for the start a naive approach is acceptable. I think this could
be better than just doing it, say, every 10 minutes because we really
don’t know yet the frequency of the data received. Also, some inputs
to kafka come directly from a CSV upload, so we will get “firehose”
periods, and periods of nothing.

In short: I think what we have closely matches session behaviour.


We also have to implement a postgres sink that is capable of doing
upserts. The reason for postgres is to service the rest front end.


We then have to build our traces and can see two options for it:


1) The most obvious solution would be to use a kafka sink for the
keyed case stream, and to do the trace aggregations in a downstream
flink job with this kafka topic as a source. However, I have some
concerns over losing any data (i.e. how do we know whether or not an
event has been successfully pushed into the kafka stream).


2) Another approach might be to use some other type of sink (perhaps
postgres), and to use this as a source for the traces job. This would
help us guarantee data consistency.


3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so:

Keyed cases stream -> broadcast -> key by tracehash with rocks db
state backend holding trace for that tracehash -> perform
aggregrations -> postgres sink

Is broadcast an option here? How costly is it?


Which of these approaches (or any other), would you recommend?


-------------------------------------

Another question regarding the state:

As we never know when a case is complete this means that the rocksdb
backend could grow infinitely (!). Obviously we would need to get a
bit smarter here.


Is using state in this way a somewhat standard practice, or is state
intended more for recovery?

Managing growing state: I found some discussion regarding how to clear
state here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402
which references https://issues.apache.org/jira/browse/FLINK-3946

Thanks,

Kat
Reply | Threaded
Open this post in threaded view
|

Re: Strategies for Complex Event Processing with guaranteed data consistency

Fabian Hueske-2
Hi Kat,

I did not understand the difference between a case and a trace.
If I got it right, the goal of your first job is to assemble the individual events into cases. Is a case here the last event for a case-id or all events of a case-id?
If a case is the collection of all events (which I assume) what is the difference to a trace which is also the list of events (if I got it right)?

In any case, I think your first job can also be solved without a session window (which is quite complex internally).
There are two options:
1) use a global window [1] with a custom trigger that triggers for each arriving record. A global window does never end, which would be OK since your cases do not end as well.
2) use a MapFunction with key-partitioned operator state [2]. The map function would simply update the state for every new event and emit a new result.

Regarding your concerns of losing data when writing to Kafka. Flink's KafkaProducer provides at-least-once guarantees, which means that data might be written more than once in case of a failure but won't be lost. If the Kafka topic is partitioned by case-id and you only need the last record per case-id, Kafka's log compaction should give you upsert semantics.

Regarding your question "Is using state in this way a somewhat standard practice, or is state intended more for recovery?":
Many streaming applications require state for their semantics (just like yours), i.e., they need to buffer data and wait for more data to arrive. In order to guarantee consistent result semantics of an application, the state must not be lost and be recovered in case of a failure. So state is not intended for recovery, but recovery is needed to guarantee application semantics.

As I said before, I did not get the difference between cases and trace, so I cannot really comment on the job to analyze traces.

Hope this helps,
Fabian

2017-01-13 11:04 GMT+01:00 Kathleen Sharp <[hidden email]>:
I have been playing around with Flink for a few weeks to try to
ascertain whether or not it meets our use cases, and also what best
practices we should be following. I have a few questions I would
appreciate answers to.


Our scenario is that we want to process a lot of event data into
cases. A case is an inorder sequence of events; this event data could
be quite old. We never know when a case is complete, so we just want
to have the most up to date picture of what a case looks like.


The inorder sequence of events of a case is called the trace. Many
cases could have an identical trace. We would like to construct these
traces, and do some aggregations on those (case count, average/min/max
life-cycle time).


We then have further downstream processing we will do on a case, some
of which would require additional inputs, either from side-inputs of
somehow joining data sources.


We don’t really care about event time at the moment, because we just
want to build cases and traces with all the data we have received.


The end results should be available for our web front end via rest api.


Based on the above I have the following idea for a first implementation:


Kafka source -> key by case id -> session window with rocks db state
backend holding case for that key -> postgres sink


The reason for a session window is that, as I mentioned above, we just
want to build a group with all the data we have received into kafka up
until that point in time. We would experiment with what this gap time
should be, and in future it might be specific to the type of group,
but for the start a naive approach is acceptable. I think this could
be better than just doing it, say, every 10 minutes because we really
don’t know yet the frequency of the data received. Also, some inputs
to kafka come directly from a CSV upload, so we will get “firehose”
periods, and periods of nothing.

In short: I think what we have closely matches session behaviour.


We also have to implement a postgres sink that is capable of doing
upserts. The reason for postgres is to service the rest front end.


We then have to build our traces and can see two options for it:


1) The most obvious solution would be to use a kafka sink for the
keyed case stream, and to do the trace aggregations in a downstream
flink job with this kafka topic as a source. However, I have some
concerns over losing any data (i.e. how do we know whether or not an
event has been successfully pushed into the kafka stream).


2) Another approach might be to use some other type of sink (perhaps
postgres), and to use this as a source for the traces job. This would
help us guarantee data consistency.


3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so:

Keyed cases stream -> broadcast -> key by tracehash with rocks db
state backend holding trace for that tracehash -> perform
aggregrations -> postgres sink

Is broadcast an option here? How costly is it?


Which of these approaches (or any other), would you recommend?


-------------------------------------

Another question regarding the state:

As we never know when a case is complete this means that the rocksdb
backend could grow infinitely (!). Obviously we would need to get a
bit smarter here.


Is using state in this way a somewhat standard practice, or is state
intended more for recovery?

Managing growing state: I found some discussion regarding how to clear
state here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402
which references https://issues.apache.org/jira/browse/FLINK-3946

Thanks,

Kat

Reply | Threaded
Open this post in threaded view
|

Re: Strategies for Complex Event Processing with guaranteed data consistency

Fabian Hueske-2

2017-01-13 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
Hi Kat,

I did not understand the difference between a case and a trace.
If I got it right, the goal of your first job is to assemble the individual events into cases. Is a case here the last event for a case-id or all events of a case-id?
If a case is the collection of all events (which I assume) what is the difference to a trace which is also the list of events (if I got it right)?

In any case, I think your first job can also be solved without a session window (which is quite complex internally).
There are two options:
1) use a global window [1] with a custom trigger that triggers for each arriving record. A global window does never end, which would be OK since your cases do not end as well.
2) use a MapFunction with key-partitioned operator state [2]. The map function would simply update the state for every new event and emit a new result.

Regarding your concerns of losing data when writing to Kafka. Flink's KafkaProducer provides at-least-once guarantees, which means that data might be written more than once in case of a failure but won't be lost. If the Kafka topic is partitioned by case-id and you only need the last record per case-id, Kafka's log compaction should give you upsert semantics.

Regarding your question "Is using state in this way a somewhat standard practice, or is state intended more for recovery?":
Many streaming applications require state for their semantics (just like yours), i.e., they need to buffer data and wait for more data to arrive. In order to guarantee consistent result semantics of an application, the state must not be lost and be recovered in case of a failure. So state is not intended for recovery, but recovery is needed to guarantee application semantics.

As I said before, I did not get the difference between cases and trace, so I cannot really comment on the job to analyze traces.

Hope this helps,
Fabian

2017-01-13 11:04 GMT+01:00 Kathleen Sharp <[hidden email]>:
I have been playing around with Flink for a few weeks to try to
ascertain whether or not it meets our use cases, and also what best
practices we should be following. I have a few questions I would
appreciate answers to.


Our scenario is that we want to process a lot of event data into
cases. A case is an inorder sequence of events; this event data could
be quite old. We never know when a case is complete, so we just want
to have the most up to date picture of what a case looks like.


The inorder sequence of events of a case is called the trace. Many
cases could have an identical trace. We would like to construct these
traces, and do some aggregations on those (case count, average/min/max
life-cycle time).


We then have further downstream processing we will do on a case, some
of which would require additional inputs, either from side-inputs of
somehow joining data sources.


We don’t really care about event time at the moment, because we just
want to build cases and traces with all the data we have received.


The end results should be available for our web front end via rest api.


Based on the above I have the following idea for a first implementation:


Kafka source -> key by case id -> session window with rocks db state
backend holding case for that key -> postgres sink


The reason for a session window is that, as I mentioned above, we just
want to build a group with all the data we have received into kafka up
until that point in time. We would experiment with what this gap time
should be, and in future it might be specific to the type of group,
but for the start a naive approach is acceptable. I think this could
be better than just doing it, say, every 10 minutes because we really
don’t know yet the frequency of the data received. Also, some inputs
to kafka come directly from a CSV upload, so we will get “firehose”
periods, and periods of nothing.

In short: I think what we have closely matches session behaviour.


We also have to implement a postgres sink that is capable of doing
upserts. The reason for postgres is to service the rest front end.


We then have to build our traces and can see two options for it:


1) The most obvious solution would be to use a kafka sink for the
keyed case stream, and to do the trace aggregations in a downstream
flink job with this kafka topic as a source. However, I have some
concerns over losing any data (i.e. how do we know whether or not an
event has been successfully pushed into the kafka stream).


2) Another approach might be to use some other type of sink (perhaps
postgres), and to use this as a source for the traces job. This would
help us guarantee data consistency.


3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so:

Keyed cases stream -> broadcast -> key by tracehash with rocks db
state backend holding trace for that tracehash -> perform
aggregrations -> postgres sink

Is broadcast an option here? How costly is it?


Which of these approaches (or any other), would you recommend?


-------------------------------------

Another question regarding the state:

As we never know when a case is complete this means that the rocksdb
backend could grow infinitely (!). Obviously we would need to get a
bit smarter here.


Is using state in this way a somewhat standard practice, or is state
intended more for recovery?

Managing growing state: I found some discussion regarding how to clear
state here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402
which references https://issues.apache.org/jira/browse/FLINK-3946

Thanks,

Kat


Reply | Threaded
Open this post in threaded view
|

Re: Strategies for Complex Event Processing with guaranteed data consistency

Kathleen Sharp
Hi Fabian,

A case consists of all events sharing the same case id. This id is
what we initially key the stream by.

The order of these events is the trace.

For example,
caseid: case1, consisting of event1, event2, event3. Start time 11:00,
end 11:05, run time 5 minutes
caseid: case12, consisting of event1, event2, event3 Start time 11:00,
end 11:15, run time 15 minutes

These are 2 distinct cases, with the same trace (event1, event2,
event3). This trace would have 2 occurrences with a min run time of 5
minutes, max 15 and average 10.

I have implemented your 2nd suggestion for the first job, I hope I
have made the Traces clearer as I am still unsure of the best approach
here.

Thanks a lot,
Kat

On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske <[hidden email]> wrote:

> On thing to add: the Flink KafkaProducer provides only at-least-once if
> flush-on-checkpoint is enabled [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean-
>
> 2017-01-13 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> Hi Kat,
>>
>> I did not understand the difference between a case and a trace.
>> If I got it right, the goal of your first job is to assemble the
>> individual events into cases. Is a case here the last event for a case-id or
>> all events of a case-id?
>> If a case is the collection of all events (which I assume) what is the
>> difference to a trace which is also the list of events (if I got it right)?
>>
>> In any case, I think your first job can also be solved without a session
>> window (which is quite complex internally).
>> There are two options:
>> 1) use a global window [1] with a custom trigger that triggers for each
>> arriving record. A global window does never end, which would be OK since
>> your cases do not end as well.
>> 2) use a MapFunction with key-partitioned operator state [2]. The map
>> function would simply update the state for every new event and emit a new
>> result.
>>
>> Regarding your concerns of losing data when writing to Kafka. Flink's
>> KafkaProducer provides at-least-once guarantees, which means that data might
>> be written more than once in case of a failure but won't be lost. If the
>> Kafka topic is partitioned by case-id and you only need the last record per
>> case-id, Kafka's log compaction should give you upsert semantics.
>>
>> Regarding your question "Is using state in this way a somewhat standard
>> practice, or is state intended more for recovery?":
>> Many streaming applications require state for their semantics (just like
>> yours), i.e., they need to buffer data and wait for more data to arrive. In
>> order to guarantee consistent result semantics of an application, the state
>> must not be lost and be recovered in case of a failure. So state is not
>> intended for recovery, but recovery is needed to guarantee application
>> semantics.
>>
>> As I said before, I did not get the difference between cases and trace, so
>> I cannot really comment on the job to analyze traces.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#global-windows
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>>
>> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <[hidden email]>:
>>>
>>> I have been playing around with Flink for a few weeks to try to
>>> ascertain whether or not it meets our use cases, and also what best
>>> practices we should be following. I have a few questions I would
>>> appreciate answers to.
>>>
>>>
>>> Our scenario is that we want to process a lot of event data into
>>> cases. A case is an inorder sequence of events; this event data could
>>> be quite old. We never know when a case is complete, so we just want
>>> to have the most up to date picture of what a case looks like.
>>>
>>>
>>> The inorder sequence of events of a case is called the trace. Many
>>> cases could have an identical trace. We would like to construct these
>>> traces, and do some aggregations on those (case count, average/min/max
>>> life-cycle time).
>>>
>>>
>>> We then have further downstream processing we will do on a case, some
>>> of which would require additional inputs, either from side-inputs of
>>> somehow joining data sources.
>>>
>>>
>>> We don’t really care about event time at the moment, because we just
>>> want to build cases and traces with all the data we have received.
>>>
>>>
>>> The end results should be available for our web front end via rest api.
>>>
>>>
>>> Based on the above I have the following idea for a first implementation:
>>>
>>>
>>> Kafka source -> key by case id -> session window with rocks db state
>>> backend holding case for that key -> postgres sink
>>>
>>>
>>> The reason for a session window is that, as I mentioned above, we just
>>> want to build a group with all the data we have received into kafka up
>>> until that point in time. We would experiment with what this gap time
>>> should be, and in future it might be specific to the type of group,
>>> but for the start a naive approach is acceptable. I think this could
>>> be better than just doing it, say, every 10 minutes because we really
>>> don’t know yet the frequency of the data received. Also, some inputs
>>> to kafka come directly from a CSV upload, so we will get “firehose”
>>> periods, and periods of nothing.
>>>
>>> In short: I think what we have closely matches session behaviour.
>>>
>>>
>>> We also have to implement a postgres sink that is capable of doing
>>> upserts. The reason for postgres is to service the rest front end.
>>>
>>>
>>> We then have to build our traces and can see two options for it:
>>>
>>>
>>> 1) The most obvious solution would be to use a kafka sink for the
>>> keyed case stream, and to do the trace aggregations in a downstream
>>> flink job with this kafka topic as a source. However, I have some
>>> concerns over losing any data (i.e. how do we know whether or not an
>>> event has been successfully pushed into the kafka stream).
>>>
>>>
>>> 2) Another approach might be to use some other type of sink (perhaps
>>> postgres), and to use this as a source for the traces job. This would
>>> help us guarantee data consistency.
>>>
>>>
>>> 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?),
>>> so:
>>>
>>> Keyed cases stream -> broadcast -> key by tracehash with rocks db
>>> state backend holding trace for that tracehash -> perform
>>> aggregrations -> postgres sink
>>>
>>> Is broadcast an option here? How costly is it?
>>>
>>>
>>> Which of these approaches (or any other), would you recommend?
>>>
>>>
>>> -------------------------------------
>>>
>>> Another question regarding the state:
>>>
>>> As we never know when a case is complete this means that the rocksdb
>>> backend could grow infinitely (!). Obviously we would need to get a
>>> bit smarter here.
>>>
>>>
>>> Is using state in this way a somewhat standard practice, or is state
>>> intended more for recovery?
>>>
>>> Managing growing state: I found some discussion regarding how to clear
>>> state here
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402
>>> which references https://issues.apache.org/jira/browse/FLINK-3946
>>>
>>> Thanks,
>>>
>>> Kat
>>
>>
>



--
_______________________________________________________

Besuchen Sie uns / Meet us:
OPEX-Woche DACH: January 17-19, Wiesbaden, Germany
OPEX Week World Summit: January 23-27, Orlando, USA
CIO EDGE Experience: Febuary 22-23, Melbourne, Australia
Business Transformation and Operational Excellence World Summit: March
21-24, Orlando, USA

Folgen Sie uns / Follow us:
Twitter | Facebook | LinkedIn | Xing | Youtube
________________________________________________________


HRB 121584 B Amtsgericht Charlottenburg, Ust-ID: DE265675123
Geschäftsführer: Dr. Gero Decker, Guido Sachs
Reply | Threaded
Open this post in threaded view
|

Re: Strategies for Complex Event Processing with guaranteed data consistency

Fabian Hueske-2
Hi Kat,

thanks for the clarification about cases and traces.

Regarding the aggregation of traces: You can either do that in the same job that constructs the cases or in a job which is decoupled by for instance Kafka.
If I got your requirements right, you need a mechanism for retraction.
A case (id: 1, event-1, event-3) would result in a trace (event-1, event-3) and go into the corresponding aggregates (e.g., increment a count by 1).
If the case with id: 1 receives another event (say event-2), it would change its trace to (event-1, event-3, event-2) such that the counter of (event-1, event-3) needs to be decreased and the counter for the new trace (1, 3, 2) be increased.

You can solve this by keeping the latest version of each case in a keyed-state (keyBy(case-id).flatMap()). Whenever, an update is received, you have to send out a retraction record for the old trace and an update record for the new trace. The aggregation would again be done on a keyed state (keyBy(trace-hash).map()).
You might want to write the result into some kind of datastore with a primary key (trace-hash) to be able to update the results in place (updates cause by retraction and update trace records). This could be a relational DB (Postres) or a compacted Kafka topic.

Hope this helps.

Best, Fabian


2017-01-16 9:49 GMT+01:00 Kathleen Sharp <[hidden email]>:
Hi Fabian,

A case consists of all events sharing the same case id. This id is
what we initially key the stream by.

The order of these events is the trace.

For example,
caseid: case1, consisting of event1, event2, event3. Start time 11:00,
end 11:05, run time 5 minutes
caseid: case12, consisting of event1, event2, event3 Start time 11:00,
end 11:15, run time 15 minutes

These are 2 distinct cases, with the same trace (event1, event2,
event3). This trace would have 2 occurrences with a min run time of 5
minutes, max 15 and average 10.

I have implemented your 2nd suggestion for the first job, I hope I
have made the Traces clearer as I am still unsure of the best approach
here.

Thanks a lot,
Kat

On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske <[hidden email]> wrote:
> On thing to add: the Flink KafkaProducer provides only at-least-once if
> flush-on-checkpoint is enabled [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean-
>
> 2017-01-13 22:02 GMT+01:00 Fabian Hueske <[hidden email]>:
>>
>> Hi Kat,
>>
>> I did not understand the difference between a case and a trace.
>> If I got it right, the goal of your first job is to assemble the
>> individual events into cases. Is a case here the last event for a case-id or
>> all events of a case-id?
>> If a case is the collection of all events (which I assume) what is the
>> difference to a trace which is also the list of events (if I got it right)?
>>
>> In any case, I think your first job can also be solved without a session
>> window (which is quite complex internally).
>> There are two options:
>> 1) use a global window [1] with a custom trigger that triggers for each
>> arriving record. A global window does never end, which would be OK since
>> your cases do not end as well.
>> 2) use a MapFunction with key-partitioned operator state [2]. The map
>> function would simply update the state for every new event and emit a new
>> result.
>>
>> Regarding your concerns of losing data when writing to Kafka. Flink's
>> KafkaProducer provides at-least-once guarantees, which means that data might
>> be written more than once in case of a failure but won't be lost. If the
>> Kafka topic is partitioned by case-id and you only need the last record per
>> case-id, Kafka's log compaction should give you upsert semantics.
>>
>> Regarding your question "Is using state in this way a somewhat standard
>> practice, or is state intended more for recovery?":
>> Many streaming applications require state for their semantics (just like
>> yours), i.e., they need to buffer data and wait for more data to arrive. In
>> order to guarantee consistent result semantics of an application, the state
>> must not be lost and be recovered in case of a failure. So state is not
>> intended for recovery, but recovery is needed to guarantee application
>> semantics.
>>
>> As I said before, I did not get the difference between cases and trace, so
>> I cannot really comment on the job to analyze traces.
>>
>> Hope this helps,
>> Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#global-windows
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>>
>> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp <[hidden email]>:
>>>
>>> I have been playing around with Flink for a few weeks to try to
>>> ascertain whether or not it meets our use cases, and also what best
>>> practices we should be following. I have a few questions I would
>>> appreciate answers to.
>>>
>>>
>>> Our scenario is that we want to process a lot of event data into
>>> cases. A case is an inorder sequence of events; this event data could
>>> be quite old. We never know when a case is complete, so we just want
>>> to have the most up to date picture of what a case looks like.
>>>
>>>
>>> The inorder sequence of events of a case is called the trace. Many
>>> cases could have an identical trace. We would like to construct these
>>> traces, and do some aggregations on those (case count, average/min/max
>>> life-cycle time).
>>>
>>>
>>> We then have further downstream processing we will do on a case, some
>>> of which would require additional inputs, either from side-inputs of
>>> somehow joining data sources.
>>>
>>>
>>> We don’t really care about event time at the moment, because we just
>>> want to build cases and traces with all the data we have received.
>>>
>>>
>>> The end results should be available for our web front end via rest api.
>>>
>>>
>>> Based on the above I have the following idea for a first implementation:
>>>
>>>
>>> Kafka source -> key by case id -> session window with rocks db state
>>> backend holding case for that key -> postgres sink
>>>
>>>
>>> The reason for a session window is that, as I mentioned above, we just
>>> want to build a group with all the data we have received into kafka up
>>> until that point in time. We would experiment with what this gap time
>>> should be, and in future it might be specific to the type of group,
>>> but for the start a naive approach is acceptable. I think this could
>>> be better than just doing it, say, every 10 minutes because we really
>>> don’t know yet the frequency of the data received. Also, some inputs
>>> to kafka come directly from a CSV upload, so we will get “firehose”
>>> periods, and periods of nothing.
>>>
>>> In short: I think what we have closely matches session behaviour.
>>>
>>>
>>> We also have to implement a postgres sink that is capable of doing
>>> upserts. The reason for postgres is to service the rest front end.
>>>
>>>
>>> We then have to build our traces and can see two options for it:
>>>
>>>
>>> 1) The most obvious solution would be to use a kafka sink for the
>>> keyed case stream, and to do the trace aggregations in a downstream
>>> flink job with this kafka topic as a source. However, I have some
>>> concerns over losing any data (i.e. how do we know whether or not an
>>> event has been successfully pushed into the kafka stream).
>>>
>>>
>>> 2) Another approach might be to use some other type of sink (perhaps
>>> postgres), and to use this as a source for the traces job. This would
>>> help us guarantee data consistency.
>>>
>>>
>>> 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?),
>>> so:
>>>
>>> Keyed cases stream -> broadcast -> key by tracehash with rocks db
>>> state backend holding trace for that tracehash -> perform
>>> aggregrations -> postgres sink
>>>
>>> Is broadcast an option here? How costly is it?
>>>
>>>
>>> Which of these approaches (or any other), would you recommend?
>>>
>>>
>>> -------------------------------------
>>>
>>> Another question regarding the state:
>>>
>>> As we never know when a case is complete this means that the rocksdb
>>> backend could grow infinitely (!). Obviously we would need to get a
>>> bit smarter here.
>>>
>>>
>>> Is using state in this way a somewhat standard practice, or is state
>>> intended more for recovery?
>>>
>>> Managing growing state: I found some discussion regarding how to clear
>>> state here
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402
>>> which references https://issues.apache.org/jira/browse/FLINK-3946
>>>
>>> Thanks,
>>>
>>> Kat
>>
>>
>



--
_______________________________________________________

Besuchen Sie uns / Meet us:
OPEX-Woche DACH: January 17-19, Wiesbaden, Germany
OPEX Week World Summit: January 23-27, Orlando, USA
CIO EDGE Experience: Febuary 22-23, Melbourne, Australia
Business Transformation and Operational Excellence World Summit: March
21-24, Orlando, USA

Folgen Sie uns / Follow us:
Twitter | Facebook | LinkedIn | Xing | Youtube
________________________________________________________


HRB 121584 B Amtsgericht Charlottenburg, Ust-ID: DE265675123
Geschäftsführer: Dr. Gero Decker, Guido Sachs