Missing support for `TestStreamEnvironment#executeAsync`

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Missing support for `TestStreamEnvironment#executeAsync`

Bob Tiernay
Hi all,

I have been trying to test a Flink 1.11 streaming job using the `DataStreamUtils#collect` utility against a `MiniCluster` based test. However, I noticed an issue when doing so.

`TestStreamEnvironment` does not implement `executeAsync`. Thus when `DataStreamUtils#collect` is called, it invokes `env.executeAsync("Data Stream Collect");` which will instead use `StreamExecutionEnvironment#executeAsync`'s implementation. This is problematic since it will create a brand new `MiniCluster` when the following lines are hit:

CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);

Any configurations that were applied during the test won't be respected. Is this expected behavior?

Thanks in advance,

Bob
Reply | Threaded
Open this post in threaded view
|

Re: Missing support for `TestStreamEnvironment#executeAsync`

Till Rohrmann
Hi Bob,

Thanks for reporting this issue. I believe that this has been an oversight. I have filed a JIRA issue for fixing this problem [1].


Cheers,
Till

On Mon, Mar 8, 2021 at 4:15 PM Bob Tiernay <[hidden email]> wrote:
Hi all,

I have been trying to test a Flink 1.11 streaming job using the `DataStreamUtils#collect` utility against a `MiniCluster` based test. However, I noticed an issue when doing so.

`TestStreamEnvironment` does not implement `executeAsync`. Thus when `DataStreamUtils#collect` is called, it invokes `env.executeAsync("Data Stream Collect");` which will instead use `StreamExecutionEnvironment#executeAsync`'s implementation. This is problematic since it will create a brand new `MiniCluster` when the following lines are hit:

CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);

Any configurations that were applied during the test won't be respected. Is this expected behavior?

Thanks in advance,

Bob
Reply | Threaded
Open this post in threaded view
|

Re: Missing support for `TestStreamEnvironment#executeAsync`

Bob Tiernay
Great, thank you so much!

On Tue, Mar 9, 2021 at 1:08 PM Till Rohrmann <[hidden email]> wrote:

This message originated outside your organization.




Hi Bob,

Thanks for reporting this issue. I believe that this has been an oversight. I have filed a JIRA issue for fixing this problem [1].


Cheers,
Till

On Mon, Mar 8, 2021 at 4:15 PM Bob Tiernay <[hidden email]> wrote:
Hi all,

I have been trying to test a Flink 1.11 streaming job using the `DataStreamUtils#collect` utility against a `MiniCluster` based test. However, I noticed an issue when doing so.

`TestStreamEnvironment` does not implement `executeAsync`. Thus when `DataStreamUtils#collect` is called, it invokes `env.executeAsync("Data Stream Collect");` which will instead use `StreamExecutionEnvironment#executeAsync`'s implementation. This is problematic since it will create a brand new `MiniCluster` when the following lines are hit:

CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);

Any configurations that were applied during the test won't be respected. Is this expected behavior?

Thanks in advance,

Bob