Minicluster Flink tests, checkpoints and inprogress part files

classic Classic list List threaded Threaded
6 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Minicluster Flink tests, checkpoints and inprogress part files

Dan
Hi Flink user group,

Background
I'm changing a Flink SQL job to use Datastream.  I'm updating an existing Minicluster test in my code.  It has a similar structure to other tests in flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink using StreamingFileSink Bulk Formats to tmp local disk.

Issue
When I try to check the files on local disk, I see ".part-0-0.inprogress.1234abcd-5678-uuid...".

Question
What's the best way to get the test to complete the outputs?  I tried checkpointing very frequently, sleeping, etc but these didn't work.

Thanks!
- Dan

Reply | Threaded
Open this post in threaded view
|

Re: Minicluster Flink tests, checkpoints and inprogress part files

Aljoscha Krettek
Hi Dan,

I'm afraid this is not easily possible using the DataStream API in
STREAMING execution mode today. However, there is one possible solution
and we're introducing changes that will also make this work on STREAMING
mode.

The possible solution is to use the `FileSink` instead of the
`StreamingFileSink`. This is an updated version of the sink that works
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
mode all your files should be "completed" at the end.

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow
sinks (and other operators) to always do one final checkpoint before
shutting down. This will allow them to move the last outstanding
inprogress files over to finished as well.

[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:

>Hi Flink user group,
>
>*Background*
>I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
>Minicluster test in my code.  It has a similar structure to other tests in
>flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>using StreamingFileSink Bulk Formats to tmp local disk.
>
>*Issue*
>When I try to check the files on local disk, I see
>".part-0-0.inprogress.1234abcd-5678-uuid...".
>
>*Question*
>What's the best way to get the test to complete the outputs?  I tried
>checkpointing very frequently, sleeping, etc but these didn't work.
>
>Thanks!
>- Dan
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Minicluster Flink tests, checkpoints and inprogress part files

Dan
Thanks Aljoscha!

On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

I'm afraid this is not easily possible using the DataStream API in
STREAMING execution mode today. However, there is one possible solution
and we're introducing changes that will also make this work on STREAMING
mode.

The possible solution is to use the `FileSink` instead of the
`StreamingFileSink`. This is an updated version of the sink that works
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
mode all your files should be "completed" at the end.

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow
sinks (and other operators) to always do one final checkpoint before
shutting down. This will allow them to move the last outstanding
inprogress files over to finished as well.

[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:
>Hi Flink user group,
>
>*Background*
>I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
>Minicluster test in my code.  It has a similar structure to other tests in
>flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>using StreamingFileSink Bulk Formats to tmp local disk.
>
>*Issue*
>When I try to check the files on local disk, I see
>".part-0-0.inprogress.1234abcd-5678-uuid...".
>
>*Question*
>What's the best way to get the test to complete the outputs?  I tried
>checkpointing very frequently, sleeping, etc but these didn't work.
>
>Thanks!
>- Dan
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Minicluster Flink tests, checkpoints and inprogress part files

Dan
Ah looks like I need to use 1.12 for this.  I'm still on 1.11.

On Fri, Feb 5, 2021, 08:37 Dan Hill <[hidden email]> wrote:
Thanks Aljoscha!

On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

I'm afraid this is not easily possible using the DataStream API in
STREAMING execution mode today. However, there is one possible solution
and we're introducing changes that will also make this work on STREAMING
mode.

The possible solution is to use the `FileSink` instead of the
`StreamingFileSink`. This is an updated version of the sink that works
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
mode all your files should be "completed" at the end.

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow
sinks (and other operators) to always do one final checkpoint before
shutting down. This will allow them to move the last outstanding
inprogress files over to finished as well.

[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:
>Hi Flink user group,
>
>*Background*
>I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
>Minicluster test in my code.  It has a similar structure to other tests in
>flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>using StreamingFileSink Bulk Formats to tmp local disk.
>
>*Issue*
>When I try to check the files on local disk, I see
>".part-0-0.inprogress.1234abcd-5678-uuid...".
>
>*Question*
>What's the best way to get the test to complete the outputs?  I tried
>checkpointing very frequently, sleeping, etc but these didn't work.
>
>Thanks!
>- Dan
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Minicluster Flink tests, checkpoints and inprogress part files

Dan
I changed the test to use ExecutionMode.BATCH in v1.11 and it still doesn't work.  How did devs write minicluster tests before for similar code?  Did they not?

On Sat, Feb 6, 2021 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah looks like I need to use 1.12 for this.  I'm still on 1.11.

On Fri, Feb 5, 2021, 08:37 Dan Hill <[hidden email]> wrote:
Thanks Aljoscha!

On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

I'm afraid this is not easily possible using the DataStream API in
STREAMING execution mode today. However, there is one possible solution
and we're introducing changes that will also make this work on STREAMING
mode.

The possible solution is to use the `FileSink` instead of the
`StreamingFileSink`. This is an updated version of the sink that works
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
mode all your files should be "completed" at the end.

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow
sinks (and other operators) to always do one final checkpoint before
shutting down. This will allow them to move the last outstanding
inprogress files over to finished as well.

[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:
>Hi Flink user group,
>
>*Background*
>I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
>Minicluster test in my code.  It has a similar structure to other tests in
>flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>using StreamingFileSink Bulk Formats to tmp local disk.
>
>*Issue*
>When I try to check the files on local disk, I see
>".part-0-0.inprogress.1234abcd-5678-uuid...".
>
>*Question*
>What's the best way to get the test to complete the outputs?  I tried
>checkpointing very frequently, sleeping, etc but these didn't work.
>
>Thanks!
>- Dan
Reply | Threaded
Open this post in threaded view
|

Re: Minicluster Flink tests, checkpoints and inprogress part files

Arvid Heise-4
Hi Dan,

it's not entirely clear to me how you want to write your tests, but it's possible with your setup (we have a couple of thousand tests in Flink that do that).

What you usually try to use is a test source that is finite (e.g. file source that is not scanning for new files), such that the streaming job eventually finishes gracefully. Then you can verify the output to the expected output. We also often just add some FiniteTestSource to tests to produce a defined set of data.

You can also cancel a job, if you use env.executeAsync after a few seconds. Lastly, you can also throw some FlagException (we often use SuccessException for that) to break out of the execution.

If you'd rather like to verify things on the fly, the common pattern is to add an operator/sink that performs some verification already.

Happy to answer further questions.

Arvid

On Sun, Feb 7, 2021 at 3:14 AM Dan Hill <[hidden email]> wrote:
I changed the test to use ExecutionMode.BATCH in v1.11 and it still doesn't work.  How did devs write minicluster tests before for similar code?  Did they not?

On Sat, Feb 6, 2021 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah looks like I need to use 1.12 for this.  I'm still on 1.11.

On Fri, Feb 5, 2021, 08:37 Dan Hill <[hidden email]> wrote:
Thanks Aljoscha!

On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek <[hidden email]> wrote:
Hi Dan,

I'm afraid this is not easily possible using the DataStream API in
STREAMING execution mode today. However, there is one possible solution
and we're introducing changes that will also make this work on STREAMING
mode.

The possible solution is to use the `FileSink` instead of the
`StreamingFileSink`. This is an updated version of the sink that works
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
mode all your files should be "completed" at the end.

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow
sinks (and other operators) to always do one final checkpoint before
shutting down. This will allow them to move the last outstanding
inprogress files over to finished as well.

[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:
>Hi Flink user group,
>
>*Background*
>I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
>Minicluster test in my code.  It has a similar structure to other tests in
>flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
>using StreamingFileSink Bulk Formats to tmp local disk.
>
>*Issue*
>When I try to check the files on local disk, I see
>".part-0-0.inprogress.1234abcd-5678-uuid...".
>
>*Question*
>What's the best way to get the test to complete the outputs?  I tried
>checkpointing very frequently, sleeping, etc but these didn't work.
>
>Thanks!
>- Dan