Hi
My streaming job uses a set of rules to process records from a stream. The rule set is defined in simple flat files, one rule per line. The rule set can change from time to time. A user will upload a new file that must replace the old rule set completely. My problem is with reading and updating the rule set when I have a new one. I cannot update single rules. I need the whole rule set to validate it and build the internal representation to broadcast. I am reading the file with a ContinuousFileReaderOperator and InputFormat (via env.readFile(...) and creating the internal representation of the rule set I then broadcast. I get new files with processingMode = PROCESS_CONTINUOUSLY How do I know when I have read ALL the records from a physical file, to trigger validating and building the new Rule Set? I've been thinking about a processing-time trigger, waiting a reasonable time after I read the first rule of a new file, but it does not look safe if the user, for example, uploads two new files by mistake. Cheers Lorenzo |
Hi Lorenzo, what you could try to do is to derive your own InputFormat (extending FileInputFormat) where you set the field `unsplittable` to true. That way, an InputSplit is the whole file and you can handle the set of new rules as a single record. Cheers, Till On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora <[hidden email]> wrote:
|
Thanks Till, I understand making my FileInputFormat "unsplittable" guarantees a file is always read by a single task. But how can I produce a single record for the entire file? As my file is a CSV with some idiosyncrasies, I am extending CsvInputFormat not to reinvent the wheel of the CSV parsing and type conversions. This generates one record per line and I cannot see any handle for the end of file. I've been thinking of using a GlobalWindow to process all the rules at once when I reach the end of file, but what can I use as a trigger? Regards Lorenzo On Wed, 1 Jul 2020 at 08:21, Till Rohrmann <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |