Hi Guys,
Following is how my pipeline looks (DataStream API) : [1] Read the data from the csv file [2] KeyBy it by some id [3] Do the enrichment and write it to DB [1] reads the data in sequence as it has single parallelism and then I have default parallelism for the other operators. I want to generate a response (ack) when all the data of the file is processed. How can I achieve this ? One solution I can think of is to have EOF dummy record in a file and a unique field for all the records in that file. Doing a keyBy on this field will make sure that all records are sent to a single slot. So, when EOF dummy records is read I can generate a response/ack. Is there a better way I can deal with this ? Regards, Vinay Patil |
Hi,
As you figured out, some dummy EOF record is one solution, however you might try to achieve it also by wrapping an existing CSV function. Your wrapper could emit this dummy EOF record. Another (probably better) idea is to use Watermark(Long.MAX_VALUE) for the EOF marker. Stream source and/or ContrinousFileReaderOperator will do that for you, so you would just need to handle the Watermark. The question is, do you need to perform the ACK operation AFTER all of the DB writes, or just after reading the CSV file? If the latter one, you could add some custom ACK operator with parallelism one just after the CSV source that waits for the EOF Watermark. If it is the first one (some kind of committing the DB writes), you would need to to wait until the EOF passes through all of your operators. You would need something like that: parallelism 1 for source -> default parallelism for keyBy/enrichment/db writes -> parallelism 1 for ACK operator on Watermark(Long.MAX_VALUE) I hope this helps, Piotrek
|
Hi Piotrek, Thank you for your detailed answer. Yes, I want to generate the ack when all the records of the file are written to DB. So to understand what you are saying , we will receive a single EOF watermark value at the ack operator when all the downstream operator process all the records of the file. But what I understand regarding the watermark is each parallel instance of the operator will emit the watermark, so how do I ensure that the EOF is reached or will I receive only one watermark at the ack operator ? So the pipeline topology will look like DataStream readFileStream = env.readFile() readFileStream .transform(// ContrinousFileReaderOperator) .key(0) .map(// encrichment) .addSink(// DB) instead of add sink, should it be a simple map operator which writes to DB so that we can have a next ack operator which will generate the response. Also, how do I get/access the Watermark value in the ack operator ? It will be a simple map operator, right ? Regards, Vinay Patil On Thu, Jan 25, 2018 at 4:50 AM, Piotr Nowojski <[hidden email]> wrote:
|
Hi,
If an operator has multiple inputs, it’s watermark will be the minimum of all of the inputs. Thus your hypothetical “ACK Operator” will get Watermark(Long.MAX_VALUE) only when of the preceding operators report Watermark(Long.MAX_VALUE). Yes, instead of simply adding sink, you would have to use something like `flatMap`, that doesn’t emit anything, only passes the watermark (default implementation are doing exactly that). To access watermark, you can use DataStream.transform function and pass your own implementation of an operator extending from AbstractStreamOperator. Probably you would only need to override processWatermark() method and there you could do the ACK operation once you get org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK. Piotrek
|
In case of reading from input files, at the EOF event, readers will send Watermark(Long.MAX_VALUE) on all of the output edges and those watermarks will be propagated accordingly. So your ACK operator will get Watermark(Long.MAX_VALUE) only when it gets it from ALL of it’s input edges.
When reading from Kafka, you do not have an EOF event, so you it would not be possible to use this Watermark(Long.MAX_VALUE). In that case you would need to emit some dummy EOF record, containing some meta information like filename alongside with correctly set event time to a value greater then original even read from Kafka which contained the filename to process. You would have to pass this EOF dummy record to your EOF operator. There you you would need to create some kind of mapping fileName -> event time marking EOF And each time you process EOF record, you add new entry to this mapping. Now whenever you process watermarks, you can check for which fileNames does this watermark guarantees that file has been processed completely. However this is more complicated and you would have to handle thins like: - cleaning up the mapping (avoiding OutOfMemory) - making sure that watermarks are generated without unnecessary latencies (when reading from file, EOF immediately emits Watermark(Long.MAX_VALUE), which might not always be the case for Kafka: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission) Piotrek
|
Free forum by Nabble | Edit this page |