Hello, I'm migrating my Spark-based stream processing application to Flink (Calcite SQL and temporal tables look too attractive to resist). My Spark app works as follows: - application is started periodically - it reads a directory of Parquet files as a stream - SQL transformations are applied - resulting append stream is written to another directory - it runs until all available data is processed - checkpoints its state - and **exits** - upon next run it resumes where it left off, processing only new data I'm having difficulties replicating this start-stop-resume behavior with Flink. When I setup my input stream using: env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY) ... I get an infinite stream, but the application will naturally keep running until aborted manually. When I use FileProcessingMode.PROCESS_ONCE - the application exits after exhausting all inputs, but it seems that Flink also treats the end of the stream as max watermark so, for example, it will close all tumbling windows that I don't want to be closed yet since more data will arrive upon next run. Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can I still trigger a savepoint when env.execute() returns? Alternatively, if I use PROCESS_CONTINUOUSLY along with env.executeAsync() is there a way for me to detect when file stream was exhausted to call job.stopWithSavepoint()? Thanks for your help! - Sergii |
Hi Sergii If I understand correctly, you want to process all the files in some directory, and do not want to process them multiple times. I'm not sure if using `FileProcessingMode#PROCESS_CONTINUOUSLY` instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and keep the job running 7*24. but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed. Best, Congxian Sergii Mikhtoniuk <[hidden email]> 于2020年5月18日周一 上午5:47写道:
|
Hi Sergii, your requirements feel a bit odd. It's neither batch nor streaming. Could you tell us why it's not possible to let the job run as a streaming job that runs continuously? Is it just a matter of saving costs? If so, you could monitor the number of records being processed and trigger stop/cancel-with-savepoint accordingly. On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi,
Actually, seems like spark dynamic allocation saves more resources in that case.
From: Arvid Heise <[hidden email]>
Sent: Monday, May 18, 2020 11:15:09 PM To: Congxian Qiu <[hidden email]> Cc: Sergii Mikhtoniuk <[hidden email]>; user <[hidden email]> Subject: Re: Process available data and stop with savepoint Hi Sergii,
your requirements feel a bit odd. It's neither batch nor streaming.
Could you tell us why it's not possible to let the job run as a streaming job that runs continuously? Is it just a matter of saving costs?
If so, you could monitor the number of records being processed and trigger stop/cancel-with-savepoint accordingly.
On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks all for responding. To give a bit more context: - I'm building a tool that performs a *fully deterministic* stream processing of mission-critical data - all input data is in the form of an append-only event log (Parquet files) - users define streaming SQL transformations to do all kinds of analysis of those event logs (joins, enrichment, aggregating events into reports etc.) - results are also written as append-only event logs I think this use case fits very well into "batch is a special case of streaming" idea. Even though I have all data history on disk I want the SQL queries users write to be fully agnostic of how data is ingested, stored, how frequently new data arrives, or how frequently it's processed. Say for example you want to generate a weekly summary report of COVID-19 cases per country: - You could write a batch job that reads the last processed week end date from previous output, checks if one full week passed since then, checks that all input data sources already posted full data for that week, and finally filters and aggregates the data. - ...But isn't it much more elegant to express it as a tumbling window aggregation, where watermarks do all the hard job for you? This kind of "write query once and run it forever" is what I'm aiming for and why I'm not using batch processing. As for why start-stop-continue behavior - most data I'm dealing with is low volume and low frequency. Think open datasets you find on government data portals, e.g. property assessment data, zoning, transit lines. All of those are updated very infrequently, so If I run the application constantly it will be idle 99% of the time. Thus I use the "pull" model, where user runs the app to update some query result to the latest available data when necessary. I realize that this kind of usage is very different from how Flink is usually deployed, but imho it's not too far-fetched. Going back to specific problems that I encountered: It seems to be not possible to use Flink 1.10 for *unbounded* file streams at all. When reading files with FileProcessingMode.PROCESS_CONTINUOUSLY the following line emits the MAX_WATERMARK into the stream even when env.stopWithSavepoint() is called, prematurely closing my tumbling windows: https://github.com/apache/flink/blob/release-1.10.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L222 I had to implement my own ContinuousFileReaderOperator as a workaround. > @Arvid: You could monitor the number of records being processed and trigger stop/cancel-with-savepoint accordingly. I was thinking in a similar direction, but got stuck on what records I should be counting and how in case of a JOIN... if I have two or more input streams and some arbitrary SQL transforms - the number of rows in the output may be different from the number of rows read. I'm curious how Flink handles data in-flight during env.stopWithSavepoint(): - Will it wait for records to propagate through the topology? ...In this case all I need is to ensure that reader read all available data before calling stop. - ...Or will in-flight records become part of the savepoint? ...In this case I'll need to think of a way to make sure not only reading but all processing finishes too. Is there perhaps a way to send other types of special messages like Watermarks through the streams without them being treated as data? I wonder if I could send some special "InputWillBlock" message from the file readers and wait for it to propagate to the output to understand that processing has finished. Again, thanks everyone for your help! - Sergii On Mon, May 18, 2020 at 8:45 AM Thomas Huang <[hidden email]> wrote:
|
I also previously had some low volume data sources that I wanted to process and I was always convinced that the proper solution would be to have auto-scaling and just decrease the used resources as much as possible (which is not trivial because of state rescaling). But thinking a bit further, it would even need to scale down to 0 to really meet the needs to some kind of hibernation mode (maybe some K8s operator that implements a duality of real cluster and cron job?). Unfortunately, scaling down won't be in Flink before 1.12 and scaling up is still limited. The vision of hibernation would be even farer down the road (possibly never). So for the time being, your approach definitively looks correct to me, save that I would take batch out of the equation. You actually would mini-streams and not mini-batches. Now to your questions: 1)
I'd be pragmatic, sum the numBytesOut metric on all operators and if it's 0 after 10 min, assume processing is done. 2) Re ContinuousFileReaderOperator.java#L222: this line is only called on close, which in turn is only called for PROCESS_ONCE (and not during stop/cancel with savepoint). I can go into details if you are interested. Stop with savepoint was previously called cancel with savepoint, exactly because it cancels all tasks (but I guess it was deemed to technical). 3) For both stopWithSavepoint and the REST API call stop [1], you actually have the option advanceToEndOfEventTime (flag indicating if the source should inject a MAX_WATERMARK in the pipeline). Set it to false and no window will be fired. 4)
In both cases checkpoint barriers are inserted at the source and then tickled through the graph. At each point when they reach an operator the respective state is stored. So if you have read all data at the sources and then trigger a checkpoint/savepoint, it is guaranteed that no unprocessed data is left when the checkpoint/savepoint is finished as no records are overtaken by the barrier (for unaligned checkpoint that's a different story). That means you can trigger your hibernation savepoint after all sources have been processed. HOWEVER, if you use PROCESS_ONCE, things actually don't work unfortunately. Because the source closes itself after having read the last file, a checkpoint barrier will not be propagated through the source and thus the checkpoint never completes. So it's absolutely mandatory currently to use PROCESS_CONTINUOUSLY. 5)
That is an interesting line of thinking. We have idle StreamStatus that are also propagated downstream, but it's currently not public API. It's currently propagated until OperatorChain and then just used on output side. On Mon, May 18, 2020 at 8:48 PM Sergii Mikhtoniuk <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |