[Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

Niels Basjes
Hi,

I have test code ( https://github.com/nielsbasjes/yauaa/blob/v5.15/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperInline.java#L140 )  that writes a DataStream to a List<> using LocalCollectionOutputFormat to verify if the pipeline did what it should do.

List<TestRecord> result = new ArrayList<>(5); 
testRecordDataSet 
     .writeUsingOutputFormat(new LocalCollectionOutputFormat<>(result)); 
environment.execute(); 
assertEquals(2, result.size());

I was just now upgrading to Flink 1.10 and I found that apparently writeUsingOutputFormat has now been deprecated.
* @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly using the
* {@link #addSink(SinkFunction)} method.
I'm not writing to a file at all. Looking at the API this StreamingFileSink does not seem to fit what I'm doing.
What is in Flink 1.10 the correct way of writing a test to verify if the output of my test run is valid?

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

Tzu-Li Tai
Hi,

To collect the elements of a DataStream (usually only meant for testing
purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

Niels Basjes
Hi Gordon,

Thanks. This works for me.

I find it strange that when I do this it works (I made the differences bold)
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
resultDataStream.print();
environment.execute();

how ever this does not work

List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

and this also does not work

resultDataStream.print();
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

In both these cases it fails with

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)


Did I do something wrong?
Is this a bug in the DataStreamUtils ?

Niels Basjes



On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <[hidden email]> wrote:
Hi,

To collect the elements of a DataStream (usually only meant for testing
purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

rmetzger0
Hey Niels,

This minimal Flink job executes in Flink 1.10:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}
Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator?

Best,
Robert

On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <[hidden email]> wrote:
Hi Gordon,

Thanks. This works for me.

I find it strange that when I do this it works (I made the differences bold)
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
resultDataStream.print();
environment.execute();

how ever this does not work

List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

and this also does not work

resultDataStream.print();
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

In both these cases it fails with

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)


Did I do something wrong?
Is this a bug in the DataStreamUtils ?

Niels Basjes



On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <[hidden email]> wrote:
Hi,

To collect the elements of a DataStream (usually only meant for testing
purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

Niels Basjes
I tried this in Flink 1.10.0 :
    @Test
public void experimentalTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("One", "Two");
// DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}

Results in

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
...


On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <[hidden email]> wrote:
Hey Niels,

This minimal Flink job executes in Flink 1.10:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}
Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator?

Best,
Robert

On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <[hidden email]> wrote:
Hi Gordon,

Thanks. This works for me.

I find it strange that when I do this it works (I made the differences bold)
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
resultDataStream.print();
environment.execute();

how ever this does not work

List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

and this also does not work

resultDataStream.print();
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

In both these cases it fails with

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)


Did I do something wrong?
Is this a bug in the DataStreamUtils ?

Niels Basjes



On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <[hidden email]> wrote:
Hi,

To collect the elements of a DataStream (usually only meant for testing
purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards / Met vriendelijke groeten,

Niels Basjes


--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

rmetzger0
Hey,
you are right. I'm also seeing this exception now. It was hidden in other log output.

The solution to all this confusion is simple: DataStreamUtils.collect() Is like an execute().

The stream graph is cleared on each execute(). That's why collect() and then execute() lead to the "no operators defined" error.
However, if you have collect(), print(), execute(), then the print() is filling the stream graph again, and you are executing two Flink jobs: the collect job and the execute job.

I hope I got it right this time :) 

Best,
Robert

On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes <[hidden email]> wrote:
I tried this in Flink 1.10.0 :
    @Test
public void experimentalTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("One", "Two");
// DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}

Results in

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
...


On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <[hidden email]> wrote:
Hey Niels,

This minimal Flink job executes in Flink 1.10:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}
Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator?

Best,
Robert

On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <[hidden email]> wrote:
Hi Gordon,

Thanks. This works for me.

I find it strange that when I do this it works (I made the differences bold)
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
resultDataStream.print();
environment.execute();

how ever this does not work

List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

and this also does not work

resultDataStream.print();
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

In both these cases it fails with

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)


Did I do something wrong?
Is this a bug in the DataStreamUtils ?

Niels Basjes



On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <[hidden email]> wrote:
Hi,

To collect the elements of a DataStream (usually only meant for testing
purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards / Met vriendelijke groeten,

Niels Basjes


--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?

Niels Basjes
Yes that's it!

now does this:

DataStream<TestRecord> resultDataStream = ...

List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils
.collect(resultDataStream)
.forEachRemaining(result::add);

assertEquals(2, result.size());

And as you explained because the 'collect' already does an execute this works like a charm.

Niels




On Sat, Feb 22, 2020 at 1:38 AM Robert Metzger <[hidden email]> wrote:
Hey,
you are right. I'm also seeing this exception now. It was hidden in other log output.

The solution to all this confusion is simple: DataStreamUtils.collect() Is like an execute().

The stream graph is cleared on each execute(). That's why collect() and then execute() lead to the "no operators defined" error.
However, if you have collect(), print(), execute(), then the print() is filling the stream graph again, and you are executing two Flink jobs: the collect job and the execute job.

I hope I got it right this time :) 

Best,
Robert

On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes <[hidden email]> wrote:
I tried this in Flink 1.10.0 :
    @Test
public void experimentalTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.fromElements("One", "Two");
// DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}

Results in

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
...


On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <[hidden email]> wrote:
Hey Niels,

This minimal Flink job executes in Flink 1.10:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = env.addSource(new StringSourceFunction());
List<String> result = new ArrayList<>(5);
DataStreamUtils.collect(input).forEachRemaining(result::add);
env.execute("Flink Streaming Java API Skeleton");
}
Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator?

Best,
Robert

On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <[hidden email]> wrote:
Hi Gordon,

Thanks. This works for me.

I find it strange that when I do this it works (I made the differences bold)
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
resultDataStream.print();
environment.execute();

how ever this does not work

List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

and this also does not work

resultDataStream.print();
List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
environment.execute();

In both these cases it fails with

java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)


Did I do something wrong?
Is this a bug in the DataStreamUtils ?

Niels Basjes



On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <[hidden email]> wrote:
Hi,

To collect the elements of a DataStream (usually only meant for testing
purposes), you can take a look at `DataStreamUtils#collect(DataStream)`.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--
Best regards / Met vriendelijke groeten,

Niels Basjes


--
Best regards / Met vriendelijke groeten,

Niels Basjes


--
Best regards / Met vriendelijke groeten,

Niels Basjes