Hi,
we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions? 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct? 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing? 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then? What happens to the CEP state? Will it be checkpointed as well? 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example. 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ... 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events). 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue? 9) How does garbage collection of temp CEP state work, or will it stay forever? For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so. 10) Are there strategies to minimize temp CEP state? In SQL queries you filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine? 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences. 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ... thanks a lot for your time and your help Juergen |
Hi Juergen, 1) yes, you are using a changelog of events. If you need more information, you could search for change data capture architecture. For alle CEP question, I'm pulling in Kostas. 12) It depends in which format the data is exported. If you use a format with schema evolution (e.g. Avro), then schema changes will be handled gracefully. Best, Arvid On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag <[hidden email]> wrote:
|
In reply to this post by Juergen Donnerstag
Hi Juergen,
I will reply to your questions inline. As a general comment I would suggest to also have a look at [3] so that you have an idea of some of the alternatives. With that said, here come the answers :) 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct? -> Yes. I think your assumption is correct. 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing? -> Flink CEP also works in event time and the re-ordering can be done by Flink 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then? What happens to the CEP state? Will it be checkpointed as well? -> If you require re-processing, then I would say that your best option is what you described. The other option would be to keep everything in Flink state until you are sure that no more corrections will come. In this case, you have to somehow issue the "correction" in a way that the downstream system can understand what to correct and how. Keep in mind that this may be an expensive operation because everything has to be kept in state for longer. 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? -> The only thing to consider is the size of your state. Time is not necessarily an issue. If your state for these 180 days is a couple of MBs, then you have no problem. If it increases fast, then you have to provision your cluster accordingly. 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example. -> You can have a look at [1] for the supported pattern combinations and you can also look at [2] for some tests of different pattern combinations. 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ... -> In FlinkCEP, each pattern becomes a single operator. This means that you will have 30-40 operators in your job graph, each with each own state. This can become heavy but once again it depends on your workload. I cannot give an estimate because in CEP, in order to guarantee correct ordering of events in an unordered stream, the library sometimes has to keep also in state more records than will be presented at the end. Have you considered going with a solution based on processfunction and broadcast state? This will also allow you to have a more dynamic set-up where patterns can be added at runtime and it will allow you to do any optimizations specific to your workload ;) For a discussion on this, check [3]. In addition, it will allow you to "multiplex" many patterns into a single operator thus potentially minimizing the amount of copies of the state you keep. 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events). -> Unfortunatelly the state in CEP is not queryable, thus I am not sure if you can inspect it at runtime. 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue? -> Rescaling a Flink job is not done automatically. You need to take a savepoint and then relaunch your job with a different parallelism. Updating a rule is not supported in CEP, as changing a rule would imply that (potentially) the state should change. But what you could do is take a savepoint, remove the old pattern and add a new one (the updated one) and tell Flink to ignore the state of the previous operator (as said earlier each CEP pattern is translated to an operator). 9) How does garbage collection of temp CEP state work, or will it stay forever? For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so. -> CEP clean state after the time horizon (specified with the .within() clause) expires. 10) Are there strategies to minimize temp CEP state? In SQL queries you filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine? -> Such optimizations are not supported out of the box. I would recommend to go with the Broadcast state approach in [3]. 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences. 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ... -> This seems to be relevant for the "data cleaning" phase, before you send your data to CEP. In this case, if the schema changes, then I assume that you need to update your initial parsing logic, which means taking a savepoint and redeploying the updated jobGraph with the new input parsing logic (if I understand correctly). thanks a lot for your time and your help I hope that above helps! Cheers, Kostas [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns [2] https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag <[hidden email]> wrote: > > Hi, > > we're in very early stages evaluating options. I'm not a Flink expert, but did read some of the docs and watched videos. Could you please help me understand if and how certain of our reqs are covered by Flink (CEP). Is this mailing list the right channel for such questions? > > 1) We receive files every day, which are exports from some database tables, containing ONLY changes from the day. Most tables have modify-cols. Even though they are files but because they contain changes only, I belief the file records shall be considered events in Flink terminology. Is that assumption correct? > > 2) The records within the DB export files are NOT in chronologically, and we can not change the export. Our use case is a "complex event processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C within 30 days, then do something". Does that work with FlinkCEP despite the events/records are not in chrono order within the file? The files are 100MB to 20GB in size. Do I need to sort the files first before CEP processing? > > 3) Occassionally some crazy people manually "correct" DB records within the database and manually trigger a re-export of ALL of the changes for that respective day (e.g. last weeks Tuesday). Consequently we receive a correction file. Same filename but "_1" appended. All filenames include the date (of the original export). What are the options to handle that case (besides telling the DB admins not to, which we did already). Regular checkpoints and re-process all files since then? What happens to the CEP state? Will it be checkpointed as well? > > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? > > 5) We also have CEP rules that must fire if after a start sequence matched, the remaining sequence did NOT within a configured window. E.g. If A, then B, but C did not occur within 30 days since A. Is that supported by FlinkCEP? I couldn't find a working example. > > 6) We expect 30-40 CEP rules. How can we estimate the required storage size for the temporary CEP state? Is there some sort of formular considering number of rules, number of records per file or day, record size, window, number of records matched per sequence, number of keyBy grouping keys, ... > > 7) I can imagine that for debugging reasons it'd be good if we were able to query the temporary CEP state. What is the (CEP) schema used to persist the CEP state and how can we query it? And does such query work on the whole cluster or only per node (e.g. because of shuffle and nodes responsible only for a portion of the events). > > 8) I understand state is stored per node. What happens if I want to add or remove a nodes. Will the state still be found, despite it being stored in another node? I read that I need to be equally careful when changing rules? Or is that a different issue? > > 9) How does garbage collection of temp CEP state work, or will it stay forever? For tracing/investigation reasons I can imagine that purging it at the earliest possible time is not always the best option. May be after 30 days later or so. > > 10) Are there strategies to minimize temp CEP state? In SQL queries you filter first on the "smallest" attributes. CEP rules form a sequence. Hence that approach will not work. Is that an issue at all? What are practical limits on the CEP temp state storage engine? > > 11) Occassionally we need to process about 200 files at once. Can I speed things up by processing all files in parallel on multiple nodes, despite their sequence (CEP use case)? This would only work if FlinkCEP in step 1 simply filters on all relevant events of a sequence, updates state, and in a step 2 - after the files are processed - evaluates the updated state if that meets the sequences. > > 12) Schema changes in the input files: Occassionly the DB source system schema is changed, and not always in a backwards compatible way (insert new fields in the middle), and also the export will have the field in the middle. This means that starting from a specific (file) date, I need to consider a different schema. This must also be handled when re-running files for the last month, because of corrections provided. And if the file format has changed someone in the middle ... > > thanks a lot for your time and your help > Juergen |
thanks a lot Juergen On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <[hidden email]> wrote: Hi Juergen, |
Amazing content, thanks for asking and answering. On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag <[hidden email]> wrote:
--
|
Free forum by Nabble | Edit this page |