FlinkCEP questions - architecture

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkCEP questions - architecture

Juergen Donnerstag
Hi,

we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions?

1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct?

2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing?

3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then?  What happens to the CEP state? Will it be checkpointed as well?

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example.

6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ...

7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events).

8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue?

9) How does garbage collection of temp CEP state work, or will it stay forever?  For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so.

10) Are there strategies to minimize temp CEP state? In SQL queries you  filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine?

11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ...

thanks a lot for your time and your help
Juergen
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP questions - architecture

Arvid Heise-3
Hi Juergen,

1) yes, you are using a changelog of events. If you need more information, you could search for change data capture architecture.

For alle CEP question, I'm pulling in Kostas.

12) It depends in which format the data is exported. If you use a format with schema evolution (e.g. Avro), then schema changes will be handled gracefully.

Best,

Arvid

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag <[hidden email]> wrote:
Hi,

we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions?

1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct?

2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing?

3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then?  What happens to the CEP state? Will it be checkpointed as well?

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example.

6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ...

7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events).

8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue?

9) How does garbage collection of temp CEP state work, or will it stay forever?  For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so.

10) Are there strategies to minimize temp CEP state? In SQL queries you  filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine?

11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ...

thanks a lot for your time and your help
Juergen
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP questions - architecture

Kostas Kloudas-2
In reply to this post by Juergen Donnerstag
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
A, then B, then C within 30 days, then do something". Does that work
with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by Flink

3) Occassionally some crazy people manually "correct" DB records
within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?  What happens to the CEP state? Will
it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction" in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" many
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a
savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a savepoint, remove the old pattern and add a new one (the
updated one) and tell Flink to ignore the state of the previous
operator (as said earlier each CEP pattern is translated to an
operator).

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging
it at the earliest possible time is not always the best option. May be
after 30 days later or so.

-> CEP clean state after the time horizon (specified with the
.within() clause) expires.

10) Are there strategies to minimize temp CEP state? In SQL queries
you  filter first on the "smallest" attributes. CEP rules form a
sequence. Hence that approach will not work. Is that an issue at all?
What are practical limits on the CEP temp state storage engine?

-> Such optimizations are not supported out of the box. I would
recommend to go with the Broadcast state approach in [3].

11) Occassionally we need to process about 200 files at once. Can I
speed things up by processing all files in parallel on multiple nodes,
despite their sequence (CEP use case)? This would only work if
FlinkCEP in step 1 simply filters on all relevant events of a
sequence, updates state, and in a step 2 - after the files are
processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source
system schema is changed, and not always in a backwards compatible way
(insert new fields in the middle), and also the export will have the
field in the middle. This means that starting from a specific (file)
date, I need to consider a different schema. This must also be handled
when re-running files for the last month, because of corrections
provided. And if the file format has changed someone in the middle ...

-> This seems to be relevant for the "data cleaning" phase, before you
send your data to CEP. In this case, if the schema changes, then I
assume that you need to update your initial parsing logic, which means
taking a savepoint and redeploying the updated jobGraph with the new
input parsing logic (if I understand correctly).

thanks a lot for your time and your help

I hope that above helps!

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
[3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
<[hidden email]> wrote:

>
> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing?
>
> 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then?  What happens to the CEP state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ...
>
> 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay forever?  For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries you  filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP questions - architecture

Juergen Donnerstag
thanks a lot
Juergen

On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <[hidden email]> wrote:
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
A, then B, then C within 30 days, then do something". Does that work
with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by Flink

3) Occassionally some crazy people manually "correct" DB records
within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?  What happens to the CEP state? Will
it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction" in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" many
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a
savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a savepoint, remove the old pattern and add a new one (the
updated one) and tell Flink to ignore the state of the previous
operator (as said earlier each CEP pattern is translated to an
operator).

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging
it at the earliest possible time is not always the best option. May be
after 30 days later or so.

-> CEP clean state after the time horizon (specified with the
.within() clause) expires.

10) Are there strategies to minimize temp CEP state? In SQL queries
you  filter first on the "smallest" attributes. CEP rules form a
sequence. Hence that approach will not work. Is that an issue at all?
What are practical limits on the CEP temp state storage engine?

-> Such optimizations are not supported out of the box. I would
recommend to go with the Broadcast state approach in [3].

11) Occassionally we need to process about 200 files at once. Can I
speed things up by processing all files in parallel on multiple nodes,
despite their sequence (CEP use case)? This would only work if
FlinkCEP in step 1 simply filters on all relevant events of a
sequence, updates state, and in a step 2 - after the files are
processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source
system schema is changed, and not always in a backwards compatible way
(insert new fields in the middle), and also the export will have the
field in the middle. This means that starting from a specific (file)
date, I need to consider a different schema. This must also be handled
when re-running files for the last month, because of corrections
provided. And if the file format has changed someone in the middle ...

-> This seems to be relevant for the "data cleaning" phase, before you
send your data to CEP. In this case, if the schema changes, then I
assume that you need to update your initial parsing logic, which means
taking a savepoint and redeploying the updated jobGraph with the new
input parsing logic (if I understand correctly).

thanks a lot for your time and your help

I hope that above helps!

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
[3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
<[hidden email]> wrote:
>
> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing?
>
> 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then?  What happens to the CEP state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ...
>
> 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay forever?  For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries you  filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen
Reply | Threaded
Open this post in threaded view
|

Re: FlinkCEP questions - architecture

Oytun Tez
Amazing content, thanks for asking and answering. 

On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag <[hidden email]> wrote:
thanks a lot
Juergen

On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <[hidden email]> wrote:
Hi Juergen,

I will reply to your questions inline. As a general comment I would
suggest to also have a look at [3] so that you have an idea of some of
the alternatives.
With that said, here come the answers :)

1) We receive files every day, which are exports from some database
tables, containing ONLY changes from the day. Most tables have
modify-cols. Even though they are files but because they contain
changes only, I belief the file records shall be considered events in
Flink terminology. Is that assumption correct?

-> Yes. I think your assumption is correct.

2) The records within the DB export files are NOT in chronologically,
and we can not change the export. Our use case is a "complex event
processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first
A, then B, then C within 30 days, then do something". Does that work
with FlinkCEP despite the events/records are not in chrono order
within the file? The files are 100MB to 20GB in size. Do I need to
sort the files first before CEP processing?

-> Flink CEP also works in event time and the re-ordering can be done by Flink

3) Occassionally some crazy people manually "correct" DB records
within the database and manually trigger a re-export of ALL of the
changes for that respective day (e.g. last weeks Tuesday).
Consequently we receive a correction file. Same filename but "_1"
appended. All filenames include the date (of the original export).
What are the options to handle that case (besides telling the DB
admins not to, which we did already). Regular checkpoints and
re-process all files since then?  What happens to the CEP state? Will
it be checkpointed as well?

-> If you require re-processing, then I would say that your best
option is what you described. The other option would be to keep
everything in Flink state until you are sure that no more corrections
will come. In this case, you have to somehow issue the "correction" in
a way that the downstream system can understand what to correct and
how. Keep in mind that this may be an expensive operation because
everything has to be kept in state for longer.

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

-> The only thing to consider is the size of your state. Time is not
necessarily an issue. If your state for these 180 days is a couple of
MBs, then you have no problem. If it increases fast, then you have to
provision your cluster accordingly.

5) We also have CEP rules that must fire if after a start sequence
matched, the remaining sequence did NOT within a configured window.
E.g. If A, then B, but C did not occur within 30 days since A. Is that
supported by FlinkCEP? I couldn't find a working example.

-> You can have a look at [1] for the supported pattern combinations
and you can also look at [2] for some tests of different pattern
combinations.

6) We expect 30-40 CEP rules. How can we estimate the required storage
size for the temporary CEP state? Is there some sort of formular
considering number of rules, number of records per file or day, record
size, window, number of records matched per sequence, number of keyBy
grouping keys, ...

-> In FlinkCEP, each pattern becomes a single operator. This means
that you will have 30-40 operators in your job graph, each with each
own state. This can become heavy but once again it depends on your
workload. I cannot give an estimate because in CEP, in order to
guarantee correct ordering of events in an unordered stream, the
library sometimes has to keep also in state more records than will be
presented at the end.

Have you considered going with a solution based on processfunction and
broadcast state? This will also allow you to have a more dynamic
set-up where patterns can be added at runtime and it will allow you to
do any optimizations specific to your workload ;) For a discussion on
this, check [3]. In addition, it will allow you to "multiplex" many
patterns into a single operator thus potentially minimizing the amount
of copies of the state you keep.

7) I can imagine that for debugging reasons it'd be good if we were
able to query the temporary CEP state. What is the (CEP) schema used
to persist the CEP state and how can we query it? And does such query
work on the whole cluster or only per node (e.g. because of shuffle
and nodes responsible only for a portion of the events).

-> Unfortunatelly the state in CEP is not queryable, thus I am not
sure if you can inspect it at runtime.

8) I understand state is stored per node. What happens if I want to
add or remove a nodes. Will the state still be found, despite it being
stored in another node? I read that I need to be equally careful when
changing rules? Or is that a different issue?

-> Rescaling a Flink job is not done automatically. You need to take a
savepoint and then relaunch your job with a different parallelism.
Updating a rule is not supported in CEP, as changing a rule would
imply that (potentially) the state should change. But what you could
do is take a savepoint, remove the old pattern and add a new one (the
updated one) and tell Flink to ignore the state of the previous
operator (as said earlier each CEP pattern is translated to an
operator).

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging
it at the earliest possible time is not always the best option. May be
after 30 days later or so.

-> CEP clean state after the time horizon (specified with the
.within() clause) expires.

10) Are there strategies to minimize temp CEP state? In SQL queries
you  filter first on the "smallest" attributes. CEP rules form a
sequence. Hence that approach will not work. Is that an issue at all?
What are practical limits on the CEP temp state storage engine?

-> Such optimizations are not supported out of the box. I would
recommend to go with the Broadcast state approach in [3].

11) Occassionally we need to process about 200 files at once. Can I
speed things up by processing all files in parallel on multiple nodes,
despite their sequence (CEP use case)? This would only work if
FlinkCEP in step 1 simply filters on all relevant events of a
sequence, updates state, and in a step 2 - after the files are
processed - evaluates the updated state if that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source
system schema is changed, and not always in a backwards compatible way
(insert new fields in the middle), and also the export will have the
field in the middle. This means that starting from a specific (file)
date, I need to consider a different schema. This must also be handled
when re-running files for the last month, because of corrections
provided. And if the file format has changed someone in the middle ...

-> This seems to be relevant for the "data cleaning" phase, before you
send your data to CEP. In this case, if the schema changes, then I
assume that you need to update your initial parsing logic, which means
taking a savepoint and redeploying the updated jobGraph with the new
input parsing logic (if I understand correctly).

thanks a lot for your time and your help

I hope that above helps!

Cheers,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
[2] https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
[3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html

On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag
<[hidden email]> wrote:
>
> Hi,
>
> we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions?
>
> 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct?
>
> 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing?
>
> 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then?  What happens to the CEP state? Will it be checkpointed as well?
>
> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?
>
> 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example.
>
> 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ...
>
> 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events).
>
> 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue?
>
> 9) How does garbage collection of temp CEP state work, or will it stay forever?  For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so.
>
> 10) Are there strategies to minimize temp CEP state? In SQL queries you  filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine?
>
> 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences.
>
> 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ...
>
> thanks a lot for your time and your help
> Juergen
--
 --

MotaWord
Oytun Tez
M O T A W O R D CTO & Co-Founder
[hidden email]