Hi Flink user group,
Background I'm changing a Flink SQL job to use Datastream. I'm updating an existing Minicluster test in my code. It has a similar structure to other tests in flink-tests. I call StreamExecutionEnvironment.execute. My tests sink using StreamingFileSink Bulk Formats to tmp local disk. Issue When I try to check the files on local disk, I see ".part-0-0.inprogress.1234abcd-5678-uuid...". Question What's the best way to get the test to complete the outputs? I tried checkpointing very frequently, sleeping, etc but these didn't work. Thanks! - Dan |
Hi Dan,
I'm afraid this is not easily possible using the DataStream API in STREAMING execution mode today. However, there is one possible solution and we're introducing changes that will also make this work on STREAMING mode. The possible solution is to use the `FileSink` instead of the `StreamingFileSink`. This is an updated version of the sink that works in both BATCH and STREAMING mode (see [1]). If you use BATCH execution mode all your files should be "completed" at the end. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html The thing we're currently working on is FLIP-147 [2], which will allow sinks (and other operators) to always do one final checkpoint before shutting down. This will allow them to move the last outstanding inprogress files over to finished as well. [2] https://cwiki.apache.org/confluence/x/mw-ZCQ I hope that helps! Best, Aljoscha On 2021/02/04 21:37, Dan Hill wrote: >Hi Flink user group, > >*Background* >I'm changing a Flink SQL job to use Datastream. I'm updating an existing >Minicluster test in my code. It has a similar structure to other tests in >flink-tests. I call StreamExecutionEnvironment.execute. My tests sink >using StreamingFileSink Bulk Formats to tmp local disk. > >*Issue* >When I try to check the files on local disk, I see >".part-0-0.inprogress.1234abcd-5678-uuid...". > >*Question* >What's the best way to get the test to complete the outputs? I tried >checkpointing very frequently, sleeping, etc but these didn't work. > >Thanks! >- Dan |
Thanks Aljoscha! On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek <[hidden email]> wrote: Hi Dan, |
Ah looks like I need to use 1.12 for this. I'm still on 1.11. On Fri, Feb 5, 2021, 08:37 Dan Hill <[hidden email]> wrote:
|
I changed the test to use ExecutionMode.BATCH in v1.11 and it still doesn't work. How did devs write minicluster tests before for similar code? Did they not? On Sat, Feb 6, 2021 at 5:38 PM Dan Hill <[hidden email]> wrote:
|
Hi Dan, it's not entirely clear to me how you want to write your tests, but it's possible with your setup (we have a couple of thousand tests in Flink that do that). What you usually try to use is a test source that is finite (e.g. file source that is not scanning for new files), such that the streaming job eventually finishes gracefully. Then you can verify the output to the expected output. We also often just add some FiniteTestSource to tests to produce a defined set of data. You can also cancel a job, if you use env.executeAsync after a few seconds. Lastly, you can also throw some FlagException (we often use SuccessException for that) to break out of the execution. If you'd rather like to verify things on the fly, the common pattern is to add an operator/sink that performs some verification already. Happy to answer further questions. Arvid On Sun, Feb 7, 2021 at 3:14 AM Dan Hill <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |