Reading and updating rule-sets from a file

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading and updating rule-sets from a file

Lorenzo Nicora
Hi

My streaming job uses a set of rules to process records from a stream.
The rule set is defined in simple flat files, one rule per line.
The rule set can change from time to time. A user will upload a new file that must replace the old rule set completely.

My problem is with reading and updating the rule set when I have a new one.
I cannot update single rules. I need the whole rule set to validate it and build the internal representation to broadcast.

I am reading the file with a ContinuousFileReaderOperator and InputFormat (via env.readFile(...) and creating the internal representation of the rule set I then broadcast. I get new files with processingMode = PROCESS_CONTINUOUSLY

How do I know when I have read ALL the records from a physical file, to trigger validating and building the new Rule Set?

I've been thinking about a processing-time trigger, waiting a reasonable time after I read the first rule of a new file, but it does not look safe if the user, for example, uploads two new files by mistake. 

Cheers
Lorenzo
Reply | Threaded
Open this post in threaded view
|

Re: Reading and updating rule-sets from a file

Till Rohrmann
Hi Lorenzo,

what you could try to do is to derive your own InputFormat (extending FileInputFormat) where you set the field `unsplittable` to true. That way, an InputSplit is the whole file and you can handle the set of new rules as a single record.

Cheers,
Till

On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora <[hidden email]> wrote:
Hi

My streaming job uses a set of rules to process records from a stream.
The rule set is defined in simple flat files, one rule per line.
The rule set can change from time to time. A user will upload a new file that must replace the old rule set completely.

My problem is with reading and updating the rule set when I have a new one.
I cannot update single rules. I need the whole rule set to validate it and build the internal representation to broadcast.

I am reading the file with a ContinuousFileReaderOperator and InputFormat (via env.readFile(...) and creating the internal representation of the rule set I then broadcast. I get new files with processingMode = PROCESS_CONTINUOUSLY

How do I know when I have read ALL the records from a physical file, to trigger validating and building the new Rule Set?

I've been thinking about a processing-time trigger, waiting a reasonable time after I read the first rule of a new file, but it does not look safe if the user, for example, uploads two new files by mistake. 

Cheers
Lorenzo
Reply | Threaded
Open this post in threaded view
|

Re: Reading and updating rule-sets from a file

Lorenzo Nicora
Thanks Till,

I understand making my FileInputFormat "unsplittable" guarantees a file is always read by a single task. But how can I produce a single record for the entire file?

As my file is a CSV with some idiosyncrasies, I am extending CsvInputFormat not to reinvent the wheel of the CSV parsing and type conversions. This generates one record per line and I cannot see any handle for the end of file.

I've been thinking of using a GlobalWindow to process all the rules at once when I reach the end of file,  but what can I use as a trigger?

Regards
Lorenzo


On Wed, 1 Jul 2020 at 08:21, Till Rohrmann <[hidden email]> wrote:
Hi Lorenzo,

what you could try to do is to derive your own InputFormat (extending FileInputFormat) where you set the field `unsplittable` to true. That way, an InputSplit is the whole file and you can handle the set of new rules as a single record.

Cheers,
Till

On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora <[hidden email]> wrote:
Hi

My streaming job uses a set of rules to process records from a stream.
The rule set is defined in simple flat files, one rule per line.
The rule set can change from time to time. A user will upload a new file that must replace the old rule set completely.

My problem is with reading and updating the rule set when I have a new one.
I cannot update single rules. I need the whole rule set to validate it and build the internal representation to broadcast.

I am reading the file with a ContinuousFileReaderOperator and InputFormat (via env.readFile(...) and creating the internal representation of the rule set I then broadcast. I get new files with processingMode = PROCESS_CONTINUOUSLY

How do I know when I have read ALL the records from a physical file, to trigger validating and building the new Rule Set?

I've been thinking about a processing-time trigger, waiting a reasonable time after I read the first rule of a new file, but it does not look safe if the user, for example, uploads two new files by mistake. 

Cheers
Lorenzo