Send ACK when all records of file are processed

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Send ACK when all records of file are processed

Vinay Patil
Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a unique field for all the records in that file. Doing a keyBy on this field will make sure that all records are sent to a single slot. So, when EOF  dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: Send ACK when all records of file are processed

Piotr Nowojski
Hi,

As you figured out, some dummy EOF record is one solution, however you might try to achieve it also by wrapping an existing CSV function. Your wrapper could emit this dummy EOF record. Another (probably better) idea is to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or ContrinousFileReaderOperator will do that for you, so you would just need to handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB writes, or just after reading the CSV file? If the latter one, you could add some custom ACK operator with parallelism one just after the CSV source that waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need to to wait until the EOF passes through all of your operators. You would need something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

On 24 Jan 2018, at 23:19, Vinay Patil <[hidden email]> wrote:

Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a unique field for all the records in that file. Doing a keyBy on this field will make sure that all records are sent to a single slot. So, when EOF  dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil

Reply | Threaded
Open this post in threaded view
|

Re: Send ACK when all records of file are processed

Vinay Patil
Hi Piotrek,

Thank you for your detailed answer.

Yes, I want to generate the ack when all the records of the file are written to DB.

So to understand what you are saying , we will receive a single EOF watermark value at the ack operator when all the downstream operator process all the records of the file. But what I understand regarding the watermark is each parallel instance of the operator will emit the watermark, so how do I ensure that the EOF is reached  or will I receive only one watermark at the ack operator ?


So the pipeline topology will look like 

DataStream  readFileStream = env.readFile()

readFileStream
                         .transform(// ContrinousFileReaderOperator)
                         .key(0)
                         .map(// encrichment)
                          .addSink(// DB)

 instead of add sink, should it be a  simple map operator which writes to DB so that we can have a next ack operator which will generate the response.

Also, how do I get/access the Watermark value in the ack operator ? It will be a simple  map operator, right ?





Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

As you figured out, some dummy EOF record is one solution, however you might try to achieve it also by wrapping an existing CSV function. Your wrapper could emit this dummy EOF record. Another (probably better) idea is to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or ContrinousFileReaderOperator will do that for you, so you would just need to handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB writes, or just after reading the CSV file? If the latter one, you could add some custom ACK operator with parallelism one just after the CSV source that waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need to to wait until the EOF passes through all of your operators. You would need something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

On 24 Jan 2018, at 23:19, Vinay Patil <[hidden email]> wrote:

Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a unique field for all the records in that file. Doing a keyBy on this field will make sure that all records are sent to a single slot. So, when EOF  dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil


Reply | Threaded
Open this post in threaded view
|

Re: Send ACK when all records of file are processed

Piotr Nowojski
Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all of the inputs. Thus your hypothetical “ACK Operator” will get Watermark(Long.MAX_VALUE) only when of the preceding operators report Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like `flatMap`, that doesn’t emit anything, only passes the watermark (default implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your own implementation of an operator extending from AbstractStreamOperator. Probably you would only need to override processWatermark() method and there you could do the ACK operation once you get org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek

On 25 Jan 2018, at 17:56, Vinay Patil <[hidden email]> wrote:

Hi Piotrek,

Thank you for your detailed answer.

Yes, I want to generate the ack when all the records of the file are written to DB.

So to understand what you are saying , we will receive a single EOF watermark value at the ack operator when all the downstream operator process all the records of the file. But what I understand regarding the watermark is each parallel instance of the operator will emit the watermark, so how do I ensure that the EOF is reached  or will I receive only one watermark at the ack operator ?


So the pipeline topology will look like 

DataStream  readFileStream = env.readFile()

readFileStream
                         .transform(// ContrinousFileReaderOperator)
                         .key(0)
                         .map(// encrichment)
                          .addSink(// DB)

 instead of add sink, should it be a  simple map operator which writes to DB so that we can have a next ack operator which will generate the response.

Also, how do I get/access the Watermark value in the ack operator ? It will be a simple  map operator, right ?





Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

As you figured out, some dummy EOF record is one solution, however you might try to achieve it also by wrapping an existing CSV function. Your wrapper could emit this dummy EOF record. Another (probably better) idea is to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or ContrinousFileReaderOperator will do that for you, so you would just need to handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB writes, or just after reading the CSV file? If the latter one, you could add some custom ACK operator with parallelism one just after the CSV source that waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need to to wait until the EOF passes through all of your operators. You would need something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

On 24 Jan 2018, at 23:19, Vinay Patil <[hidden email]> wrote:

Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a unique field for all the records in that file. Doing a keyBy on this field will make sure that all records are sent to a single slot. So, when EOF  dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil



Reply | Threaded
Open this post in threaded view
|

Re: Send ACK when all records of file are processed

Piotr Nowojski
In case of reading from input files, at the EOF event, readers will send Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will be propagated accordingly. So your ACK operator will get Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.

When reading from Kafka, you do not have an EOF event, so you it would not be possible to use this Watermark(Long.MAX_VALUE). In that case you would need to emit some dummy EOF record, containing some meta information like filename alongside with correctly set event time to a value greater then original even read from Kafka which contained the filename to process. You would have to pass this EOF dummy record to your EOF operator. There you you would need to create some kind of mapping 

fileName -> event time marking EOF

And each time you process EOF record, you add new entry to this mapping. Now whenever you process watermarks, you can check for which fileNames does this watermark guarantees that file has been processed completely.

However this is more complicated and you would have to handle thins like:
- cleaning up the mapping (avoiding OutOfMemory)
- making sure that watermarks are generated without unnecessary latencies (when reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might not always be the case for Kafka: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission)

Piotrek

On 30 Jan 2018, at 15:17, Vinay Patil <[hidden email]> wrote:

Yeh, so this is the current implementation.

One question regarding the Watermark, since watermark is chosen as minimum value of all of input streams, only one input  stream will have watermark value to LONG.MAX_VALUE which denotes the EOF processing whereas the other streams will not have this value , is my understanding right ? So in this case LONG.MAX_VALUE will always be a greater value than it's input streams. Or the LONG.MAX_VALUE watermark will flow from each input stream ?


I was thinking of directly reading from Kafka as source in Flink in order to remove the middle layer of independent Kafka Consumer which is triggering Flink job.

So, the pipeline will be 1. readFrom Kafka -> take the File location -> read using FileReaderOperator

But in this case how do I determine for which File I have received the LONG.MAX_VALUE, it will get complicated.



Regards,
Vinay Patil

On Tue, Jan 30, 2018 at 1:57 AM, Piotr Nowojski <[hidden email]> wrote:
Thanks for the clarification :)

Since you have one Job per an ACK, you can just relay on Watermark(Long.MAX_VALUE) to mark the end of the processing.

More complicated solution (compared to what I proposed before) would be needed if you had one long living job (for example multiple weeks) and it would need to produce multiple ACKs in different point of time.

Piotrek


On 29 Jan 2018, at 15:43, Vinay Patil <[hidden email]> wrote:

Sure, here is the complete design that we have :

File metadata (NFS location of file) is stored in kafka , we are having a Kafka Consumer (not flink one) which will read from each partition and trigger a Flink job on cluster. 

The Flink job will then read from a file and do the processing as I mentioned earlier.

The requirement here is we need to trigger a ACK if the validations for all the records in a file are successful.

P.S I know we are not using Kafka to its full potential and are just using it for storing metadata :) 

Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 11:57 AM, Piotr Nowojski <[hidden email]> wrote:
Could you rephrase what is your concern? 

Thanks, Piotrek


On 25 Jan 2018, at 18:54, Vinay Patil <[hidden email]> wrote:

Hi,

No, to clarify I need to send the ack for each file when it gets processed completely and there are multiple files that I am going to read from the shared location.

Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 11:37 AM, Piotr Nowojski <[hidden email]> wrote:

Yes, make sense. Just looked at the code of ContinousFileReaderOperator ,it does not emit any Watermark in the processWatermark function. It only does it in the close function , so we will get the max value when all records are read.


Yes.

What if I am reading multiple files from a shared location, in that case I will have to override the processElement as well to generate a ack for that particular file, so the flatMap will simply write to DB  and emit the fileName .


Yes, as long as you are OK with performing all of the fileName ACKs at the end (once all of the source files were processed), and not ASAP.
 
In the processWatermark function of ACK operator, when the watermark is Long.MAX_VALUE I will collect the record , and the processElement function will then generate the ack based on some condition met , right ?

Other way around. In your ACK Operator processElement you would have to collect all of the fileNames on the flink state (ListState?) and process them in the ACK Operator processWatermark once it gets Long.MAX_VALUE.

Piotrek
  

Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 11:10 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

If an operator has multiple inputs, it’s watermark will be the minimum of all of the inputs. Thus your hypothetical “ACK Operator” will get Watermark(Long.MAX_VALUE) only when of the preceding operators report Watermark(Long.MAX_VALUE). 

Yes, instead of simply adding sink, you would have to use something like `flatMap`, that doesn’t emit anything, only passes the watermark (default implementation are doing exactly that).

To access watermark, you can use DataStream.transform function and pass your own implementation of an operator extending from AbstractStreamOperator. Probably you would only need to override processWatermark() method and there you could do the ACK operation once you get org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK.

Piotrek


On 25 Jan 2018, at 17:56, Vinay Patil <[hidden email]> wrote:

Hi Piotrek,

Thank you for your detailed answer.

Yes, I want to generate the ack when all the records of the file are written to DB.

So to understand what you are saying , we will receive a single EOF watermark value at the ack operator when all the downstream operator process all the records of the file. But what I understand regarding the watermark is each parallel instance of the operator will emit the watermark, so how do I ensure that the EOF is reached  or will I receive only one watermark at the ack operator ?


So the pipeline topology will look like 

DataStream  readFileStream = env.readFile()

readFileStream
                         .transform(// ContrinousFileReaderOperator)
                         .key(0)
                         .map(// encrichment)
                          .addSink(// DB)

 instead of add sink, should it be a  simple map operator which writes to DB so that we can have a next ack operator which will generate the response.

Also, how do I get/access the Watermark value in the ack operator ? It will be a simple  map operator, right ?





Regards,
Vinay Patil

On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <[hidden email]> wrote:
Hi,

As you figured out, some dummy EOF record is one solution, however you might try to achieve it also by wrapping an existing CSV function. Your wrapper could emit this dummy EOF record. Another (probably better) idea is to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or ContrinousFileReaderOperator will do that for you, so you would just need to handle the Watermark.

The question is, do you need to perform the ACK operation AFTER all of the DB writes, or just after reading the CSV file? If the latter one, you could add some custom ACK operator with parallelism one just after the CSV source that waits for the EOF Watermark. 

If it is the first one (some kind of committing the DB writes), you would need to to wait until the EOF passes through all of your operators. You would need something like that:

parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE)

I hope this helps,
Piotrek

On 24 Jan 2018, at 23:19, Vinay Patil <[hidden email]> wrote:

Hi Guys,

Following is how my pipeline looks (DataStream API) :

[1] Read the data from the csv file
[2] KeyBy it by some id
[3] Do the enrichment and write it to DB

[1] reads the data in sequence as it has single parallelism and then I have default parallelism for the other operators.

I want to generate a response (ack) when all the data of the file is processed. How can I achieve this ?

One solution I can think of is to have EOF dummy record in a file and a unique field for all the records in that file. Doing a keyBy on this field will make sure that all records are sent to a single slot. So, when EOF  dummy records is read I can generate a response/ack.

Is there a better way I can deal with this ?


Regards,
Vinay Patil