DataStreamUtils not working properly

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

DataStreamUtils not working properly

subashbasnet
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet

Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

subashbasnet
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet


Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

Till Rohrmann
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet



Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

subashbasnet
Hello Till,

Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet




Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

Till Rohrmann
It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.

Cheers,
Till

On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
Hello Till,

Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet





Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

subashbasnet
Hello Till, 

Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)

The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.

Cheers,
Till

On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
Hello Till,

Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet






Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

Maximilian Michels
Just tried the following and it worked:

public static void main(String[] args) throws IOException {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
   final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
   source.print();

   final Iterator<Integer> iter = DataStreamUtils.collect(source);
   while (iter.hasNext()) {
      System.out.println(iter.next());
   }
}

It prints:

1
2
3
4
2> 2
1> 1
4> 4
3> 3

However, the collect util needs some improvements. It assumes that the machine running the code is reachable on a random port by the Flink cluster. If you have any firewalls, then this might not work.

Cheers,
Max

On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <[hidden email]> wrote:
Hello Till, 

Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)

The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.

Cheers,
Till

On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
Hello Till,

Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet







Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

subashbasnet
In reply to this post by subashbasnet
Hello Maximilian,

Thank's for the update. Yup it works in the example you gave. I checked with collection also it works. But not in my datastream case after the collection. 
DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
while (iter.hasNext()) {
System.out.println(iter.next());
}
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for (Centroid c : testCentroids) {
System.out.println(c);
}

In the above code the while loop prints the result as below, but the next for loop after the collection gives blank. 

Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0

Not sure, what is the problem, as after collection it gives blank result in my case but works in the example you gave. Below is my newCentroidDataStream: 

@SuppressWarnings("serial")
DataStream<Tuple2<String, Double[]>> newCentroidDataStream = keyedEdits.timeWindow(Time.seconds(1))
.fold(new Tuple2<>("", columns1), new FoldFunction<Stock, Tuple2<String, Double[]>>() {
@Override
public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock value) {
Double[] columns = new Double[5];// close,high,low,open,volume
columns[0] = value.getClose();
columns[1] = value.getHigh();
columns[2] = value.getLow();
columns[3] = value.getOpen();
columns[4] = (double) value.getVolume();
return (new Tuple2<String, Double[]>(value.getId(), columns));
}
});

Regards,
Subash Basnet

On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Just tried the following and it worked:

public static void main(String[] args) throws IOException {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
   final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
   source.print();

   final Iterator<Integer> iter = DataStreamUtils.collect(source);
   while (iter.hasNext()) {
      System.out.println(iter.next());
   }
}

It prints:

1
2
3
4
2> 2
1> 1
4> 4
3> 3

However, the collect util needs some improvements. It assumes that the machine running the code is reachable on a random port by the Flink cluster. If you have any firewalls, then this might not work.

Cheers,
Max

On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <[hidden email]> wrote:
Hello Till, 

Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)

The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.

Cheers,
Till

On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
Hello Till,

Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse. 

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
Hello all,

I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
for (Tuple1<String> c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
Hello all,

I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:

DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
centroids.print();
Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above centroids.print() gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next System.out.println(c) within the for loop prints nothing. What could be the problem. 

My maven has following configuration for dataStreamUtils:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.10</artifactId>
<version>${flink.version}</version>
</dependency>


Best Regards,
Subash Basnet









Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

Maximilian Michels
Ah, now I see where the problem lies. You're reusing the Iterator
which you have already used in the for loop. You can only iterate over
the elements once! This is the nature of the Java Iterator and
DataStreamUtils.collect(..) returns an iterator.

On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <[hidden email]> wrote:

>
> Hello Maximilian,
>
> Thank's for the update. Yup it works in the example you gave. I checked with collection also it works. But not in my datastream case after the collection.
> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> while (iter.hasNext()) {
> System.out.println(iter.next());
> }
> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
> for (Centroid c : testCentroids) {
> System.out.println(c);
> }
>
> In the above code the while loop prints the result as below, but the next for loop after the collection gives blank.
>
> Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
> Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
> Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
> Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0
>
> Not sure, what is the problem, as after collection it gives blank result in my case but works in the example you gave. Below is my newCentroidDataStream:
>
> @SuppressWarnings("serial")
> DataStream<Tuple2<String, Double[]>> newCentroidDataStream = keyedEdits.timeWindow(Time.seconds(1))
> .fold(new Tuple2<>("", columns1), new FoldFunction<Stock, Tuple2<String, Double[]>>() {
> @Override
> public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock value) {
> Double[] columns = new Double[5];// close,high,low,open,volume
> columns[0] = value.getClose();
> columns[1] = value.getHigh();
> columns[2] = value.getLow();
> columns[3] = value.getOpen();
> columns[4] = (double) value.getVolume();
> return (new Tuple2<String, Double[]>(value.getId(), columns));
> }
> });
>
> Regards,
> Subash Basnet
>
> On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info
>>
>> Just tried the following and it worked:
>>
>> public static void main(String[] args) throws IOException {
>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>    final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
>>    source.print();
>>
>>    final Iterator<Integer> iter = DataStreamUtils.collect(source);
>>    while (iter.hasNext()) {
>>       System.out.println(iter.next());
>>    }
>> }
>>
>> It prints:
>>
>> 1
>> 2
>> 3
>> 4
>> 2> 2
>> 1> 1
>> 4> 4
>> 3> 3
>>
>> However, the collect util needs some improvements. It assumes that the machine running the code is reachable on a random port by the Flink cluster. If you have any firewalls, then this might not work.
>>
>> Cheers,
>> Max
>>
>> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <[hidden email]> wrote:
>>>
>>> Hello Till,
>>>
>>> Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
>>> 22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
>>> 22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
>>>
>>> The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result.
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
>>>>
>>>> It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
>>>>>
>>>>> Hello Till,
>>>>>
>>>>> Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse.
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
>>>>>>
>>>>>> Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.
>>>>>>>
>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
>>>>>>> centroidId.print();
>>>>>>> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
>>>>>>> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
>>>>>>> for (Tuple1<String> c : testCentroids) {
>>>>>>> System.out.println(c);
>>>>>>> }
>>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet
>>>>>>>
>>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:
>>>>>>>>
>>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>>> centroids.print();
>>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>>>>>>>> for(Centroid c: testCentroids){
>>>>>>>> System.out.println(c);
>>>>>>>> }
>>>>>>>>
>>>>>>>> The above centroids.print() gives the following output in console:
>>>>>>>>
>>>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>>>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>>>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>>>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>>>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>>>>>>>
>>>>>>>> But the next System.out.println(c) within the for loop prints nothing. What could be the problem.
>>>>>>>>
>>>>>>>> My maven has following configuration for dataStreamUtils:
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>>>>>>>> <version>${flink.version}</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Subash Basnet
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

subashbasnet
In reply to this post by subashbasnet
hello maximilian,

Thanks! I learned new thing today :). But my problem still exists. Your example has little data and it works fine. But in my datastream I have set timeWindow as Time.seconds(5). What I found out is, if I print as below as your example:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
List<Centroid> testCentroids = new ArrayList<Centroid>();
  while (iter.hasNext()) {
System.out.println(iter.next());
}

It prints the result in a streaming manner. But now if I collect in arrayList and print as below:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
List<Centroid> testCentroids = new ArrayList<Centroid>();
  while (iter.hasNext()) {
testCentroids.add(iter.next());
  }
for(Centroid centroid: testCentroids){
System.out.println(centroid);
}

It waits for all the time, till all the stream get's collected in the arrayList I guess, and prints all the values in the arraylist finally. I had just waited for roughly around 2 minutes, found out that arraylist got printed and the program ended automatically after the print of the arraylist along with some exception messages. Why is this arraylist collection waiting till a huge collection of multiple input stream of centroids gets printed at once. What could be the issue. 
And it printed the following exceptions also along with all the items in the arraylist:

Exception in thread "Thread-1" java.lang.RuntimeException: Exception in execute()
at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1195)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170)

Regards,
Subash Basnet


Best Regards,
Subash Basnet


On Wed, Jul 20, 2016 at 2:05 PM, Maximilian Michels <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Ah, now I see where the problem lies. You're reusing the Iterator
which you have already used in the for loop. You can only iterate over
the elements once! This is the nature of the Java Iterator and
DataStreamUtils.collect(..) returns an iterator.

On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <[hidden email]> wrote:
>
> Hello Maximilian,
>
> Thank's for the update. Yup it works in the example you gave. I checked with collection also it works. But not in my datastream case after the collection.
> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> while (iter.hasNext()) {
> System.out.println(iter.next());
> }
> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
> for (Centroid c : testCentroids) {
> System.out.println(c);
> }
>
> In the above code the while loop prints the result as below, but the next for loop after the collection gives blank.
>
> Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
> Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
> Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
> Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0
>
> Not sure, what is the problem, as after collection it gives blank result in my case but works in the example you gave. Below is my newCentroidDataStream:
>
> @SuppressWarnings("serial")
> DataStream<Tuple2<String, Double[]>> newCentroidDataStream = keyedEdits.timeWindow(Time.seconds(1))
> .fold(new Tuple2<>("", columns1), new FoldFunction<Stock, Tuple2<String, Double[]>>() {
> @Override
> public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock value) {
> Double[] columns = new Double[5];// close,high,low,open,volume
> columns[0] = value.getClose();
> columns[1] = value.getHigh();
> columns[2] = value.getLow();
> columns[3] = value.getOpen();
> columns[4] = (double) value.getVolume();
> return (new Tuple2<String, Double[]>(value.getId(), columns));
> }
> });
>
> Regards,
> Subash Basnet
>
> On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info
>>
>> Just tried the following and it worked:
>>
>> public static void main(String[] args) throws IOException {
>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>    final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
>>    source.print();
>>
>>    final Iterator<Integer> iter = DataStreamUtils.collect(source);
>>    while (iter.hasNext()) {
>>       System.out.println(iter.next());
>>    }
>> }
>>
>> It prints:
>>
>> 1
>> 2
>> 3
>> 4
>> 2> 2
>> 1> 1
>> 4> 4
>> 3> 3
>>
>> However, the collect util needs some improvements. It assumes that the machine running the code is reachable on a random port by the Flink cluster. If you have any firewalls, then this might not work.
>>
>> Cheers,
>> Max
>>
>> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <[hidden email]> wrote:
>>>
>>> Hello Till,
>>>
>>> Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
>>> 22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
>>> 22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
>>>
>>> The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result.
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
>>>>
>>>> It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
>>>>>
>>>>> Hello Till,
>>>>>
>>>>> Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse.
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
>>>>>>
>>>>>> Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.
>>>>>>>
>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
>>>>>>> centroidId.print();
>>>>>>> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
>>>>>>> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
>>>>>>> for (Tuple1<String> c : testCentroids) {
>>>>>>> System.out.println(c);
>>>>>>> }
>>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet
>>>>>>>
>>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:
>>>>>>>>
>>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>>> centroids.print();
>>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>>>>>>>> for(Centroid c: testCentroids){
>>>>>>>> System.out.println(c);
>>>>>>>> }
>>>>>>>>
>>>>>>>> The above centroids.print() gives the following output in console:
>>>>>>>>
>>>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>>>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>>>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>>>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>>>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>>>>>>>
>>>>>>>> But the next System.out.println(c) within the for loop prints nothing. What could be the problem.
>>>>>>>>
>>>>>>>> My maven has following configuration for dataStreamUtils:
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>>>>>>>> <version>${flink.version}</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Subash Basnet
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

Maximilian Michels
Everything works as expected. The while loop blocks until the iterator doesn't have data anymore (=the program has ended). All data will end up in the ArrayList.

The latter exception comes from a duplicate call to execute(). Actually, collect() internally calls execute() because the job has to run to transfer data back to the client.

On Wed, Jul 20, 2016 at 3:38 PM, subash basnet <[hidden email]> wrote:
hello maximilian,

Thanks! I learned new thing today :). But my problem still exists. Your example has little data and it works fine. But in my datastream I have set timeWindow as Time.seconds(5). What I found out is, if I print as below as your example:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
List<Centroid> testCentroids = new ArrayList<Centroid>();
  while (iter.hasNext()) {
System.out.println(iter.next());
}

It prints the result in a streaming manner. But now if I collect in arrayList and print as below:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
List<Centroid> testCentroids = new ArrayList<Centroid>();
  while (iter.hasNext()) {
testCentroids.add(iter.next());
  }
for(Centroid centroid: testCentroids){
System.out.println(centroid);
}

It waits for all the time, till all the stream get's collected in the arrayList I guess, and prints all the values in the arraylist finally. I had just waited for roughly around 2 minutes, found out that arraylist got printed and the program ended automatically after the print of the arraylist along with some exception messages. Why is this arraylist collection waiting till a huge collection of multiple input stream of centroids gets printed at once. What could be the issue. 
And it printed the following exceptions also along with all the items in the arraylist:

Exception in thread "Thread-1" java.lang.RuntimeException: Exception in execute()
at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1195)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170)

Regards,
Subash Basnet


Best Regards,
Subash Basnet


On Wed, Jul 20, 2016 at 2:05 PM, Maximilian Michels <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Ah, now I see where the problem lies. You're reusing the Iterator
which you have already used in the for loop. You can only iterate over
the elements once! This is the nature of the Java Iterator and
DataStreamUtils.collect(..) returns an iterator.

On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <[hidden email]> wrote:
>
> Hello Maximilian,
>
> Thank's for the update. Yup it works in the example you gave. I checked with collection also it works. But not in my datastream case after the collection.
> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> while (iter.hasNext()) {
> System.out.println(iter.next());
> }
> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
> for (Centroid c : testCentroids) {
> System.out.println(c);
> }
>
> In the above code the while loop prints the result as below, but the next for loop after the collection gives blank.
>
> Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
> Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
> Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
> Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0
>
> Not sure, what is the problem, as after collection it gives blank result in my case but works in the example you gave. Below is my newCentroidDataStream:
>
> @SuppressWarnings("serial")
> DataStream<Tuple2<String, Double[]>> newCentroidDataStream = keyedEdits.timeWindow(Time.seconds(1))
> .fold(new Tuple2<>("", columns1), new FoldFunction<Stock, Tuple2<String, Double[]>>() {
> @Override
> public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock value) {
> Double[] columns = new Double[5];// close,high,low,open,volume
> columns[0] = value.getClose();
> columns[1] = value.getHigh();
> columns[2] = value.getLow();
> columns[3] = value.getOpen();
> columns[4] = (double) value.getVolume();
> return (new Tuple2<String, Double[]>(value.getId(), columns));
> }
> });
>
> Regards,
> Subash Basnet
>
> On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info
>>
>> Just tried the following and it worked:
>>
>> public static void main(String[] args) throws IOException {
>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>    final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
>>    source.print();
>>
>>    final Iterator<Integer> iter = DataStreamUtils.collect(source);
>>    while (iter.hasNext()) {
>>       System.out.println(iter.next());
>>    }
>> }
>>
>> It prints:
>>
>> 1
>> 2
>> 3
>> 4
>> 2> 2
>> 1> 1
>> 4> 4
>> 3> 3
>>
>> However, the collect util needs some improvements. It assumes that the machine running the code is reachable on a random port by the Flink cluster. If you have any firewalls, then this might not work.
>>
>> Cheers,
>> Max
>>
>> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <[hidden email]> wrote:
>>>
>>> Hello Till,
>>>
>>> Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
>>> 22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
>>> 22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
>>>
>>> The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result.
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
>>>>
>>>> It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
>>>>>
>>>>> Hello Till,
>>>>>
>>>>> Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse.
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
>>>>>>
>>>>>> Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.
>>>>>>>
>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
>>>>>>> centroidId.print();
>>>>>>> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
>>>>>>> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
>>>>>>> for (Tuple1<String> c : testCentroids) {
>>>>>>> System.out.println(c);
>>>>>>> }
>>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet
>>>>>>>
>>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:
>>>>>>>>
>>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>>> centroids.print();
>>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>>>>>>>> for(Centroid c: testCentroids){
>>>>>>>> System.out.println(c);
>>>>>>>> }
>>>>>>>>
>>>>>>>> The above centroids.print() gives the following output in console:
>>>>>>>>
>>>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>>>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>>>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>>>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>>>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>>>>>>>
>>>>>>>> But the next System.out.println(c) within the for loop prints nothing. What could be the problem.
>>>>>>>>
>>>>>>>> My maven has following configuration for dataStreamUtils:
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>>>>>>>> <version>${flink.version}</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Subash Basnet
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>



Reply | Threaded
Open this post in threaded view
|

Re: DataStreamUtils not working properly

subashbasnet
Hello maximilian,

Then as you said, in starting question of thread:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above waits until the iterator doesn't have data anymore, and finally prints all the data that exists in the testCentroids. After this the programs throws exception() as mentioned earlier and the program ends. What could be the way to print the intermediate collection in streaming manner i.e. in certain interval of time, but not wait till the end and print all of them.

And as exception occurs and program ends, how will I be able to use this testCentroids in operations that I need to perform in the later stage of the program. 

Regards,
Subash Basnet

On Wed, Jul 20, 2016 at 4:00 PM, Maximilian Michels <[hidden email]> wrote:
Everything works as expected. The while loop blocks until the iterator doesn't have data anymore (=the program has ended). All data will end up in the ArrayList.

The latter exception comes from a duplicate call to execute(). Actually, collect() internally calls execute() because the job has to run to transfer data back to the client.

On Wed, Jul 20, 2016 at 3:38 PM, subash basnet <[hidden email]> wrote:
hello maximilian,

Thanks! I learned new thing today :). But my problem still exists. Your example has little data and it works fine. But in my datastream I have set timeWindow as Time.seconds(5). What I found out is, if I print as below as your example:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
List<Centroid> testCentroids = new ArrayList<Centroid>();
  while (iter.hasNext()) {
System.out.println(iter.next());
}

It prints the result in a streaming manner. But now if I collect in arrayList and print as below:

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
List<Centroid> testCentroids = new ArrayList<Centroid>();
  while (iter.hasNext()) {
testCentroids.add(iter.next());
  }
for(Centroid centroid: testCentroids){
System.out.println(centroid);
}

It waits for all the time, till all the stream get's collected in the arrayList I guess, and prints all the values in the arraylist finally. I had just waited for roughly around 2 minutes, found out that arraylist got printed and the program ended automatically after the print of the arraylist along with some exception messages. Why is this arraylist collection waiting till a huge collection of multiple input stream of centroids gets printed at once. What could be the issue. 
And it printed the following exceptions also along with all the items in the arraylist:

Exception in thread "Thread-1" java.lang.RuntimeException: Exception in execute()
at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1195)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170)

Regards,
Subash Basnet


Best Regards,
Subash Basnet


On Wed, Jul 20, 2016 at 2:05 PM, Maximilian Michels <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Ah, now I see where the problem lies. You're reusing the Iterator
which you have already used in the for loop. You can only iterate over
the elements once! This is the nature of the Java Iterator and
DataStreamUtils.collect(..) returns an iterator.

On Wed, Jul 20, 2016 at 1:11 PM, subash basnet <[hidden email]> wrote:
>
> Hello Maximilian,
>
> Thank's for the update. Yup it works in the example you gave. I checked with collection also it works. But not in my datastream case after the collection.
> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
> while (iter.hasNext()) {
> System.out.println(iter.next());
> }
> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
> for (Centroid c : testCentroids) {
> System.out.println(c);
> }
>
> In the above code the while loop prints the result as below, but the next for loop after the collection gives blank.
>
> Tue Jul 19 15:49:00 CEST 2016  118.7 118.81 118.7 118.77 76300.0
> Tue Jul 19 15:47:02 CEST 2016  118.85 118.885 118.8 118.84 75600.0
> Tue Jul 19 15:46:00 CEST 2016  118.8627 118.93 118.79 118.8 76300.0
> Tue Jul 19 15:45:59 CEST 2016  118.8 118.94 118.77 118.9 106800.0
>
> Not sure, what is the problem, as after collection it gives blank result in my case but works in the example you gave. Below is my newCentroidDataStream:
>
> @SuppressWarnings("serial")
> DataStream<Tuple2<String, Double[]>> newCentroidDataStream = keyedEdits.timeWindow(Time.seconds(1))
> .fold(new Tuple2<>("", columns1), new FoldFunction<Stock, Tuple2<String, Double[]>>() {
> @Override
> public Tuple2<String, Double[]> fold(Tuple2<String, Double[]> st, Stock value) {
> Double[] columns = new Double[5];// close,high,low,open,volume
> columns[0] = value.getClose();
> columns[1] = value.getHigh();
> columns[2] = value.getLow();
> columns[3] = value.getOpen();
> columns[4] = (double) value.getVolume();
> return (new Tuple2<String, Double[]>(value.getId(), columns));
> }
> });
>
> Regards,
> Subash Basnet
>
> On Wed, Jul 20, 2016 at 12:20 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info
>>
>> Just tried the following and it worked:
>>
>> public static void main(String[] args) throws IOException {
>>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>    final DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);
>>    source.print();
>>
>>    final Iterator<Integer> iter = DataStreamUtils.collect(source);
>>    while (iter.hasNext()) {
>>       System.out.println(iter.next());
>>    }
>> }
>>
>> It prints:
>>
>> 1
>> 2
>> 3
>> 4
>> 2> 2
>> 1> 1
>> 4> 4
>> 3> 3
>>
>> However, the collect util needs some improvements. It assumes that the machine running the code is reachable on a random port by the Flink cluster. If you have any firewalls, then this might not work.
>>
>> Cheers,
>> Max
>>
>> On Tue, Jul 19, 2016 at 10:13 PM, subash basnet <[hidden email]> wrote:
>>>
>>> Hello Till,
>>>
>>> Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below:
>>> 22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
>>> 22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
>>>
>>> The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result.
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann <[hidden email]> wrote:
>>>>
>>>> It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet <[hidden email]> wrote:
>>>>>
>>>>> Hello Till,
>>>>>
>>>>> Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse.
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann <[hidden email]> wrote:
>>>>>>
>>>>>> Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty.
>>>>>>>
>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>> DataStream<Tuple1<String>> centroidId = centroids.map(new TestMethod());
>>>>>>> centroidId.print();
>>>>>>> Iterator<Tuple1<String>> iter = DataStreamUtils.collect(centroidId);
>>>>>>> Collection<Tuple1<String>> testCentroids = Lists.newArrayList(iter);
>>>>>>> for (Tuple1<String> c : testCentroids) {
>>>>>>> System.out.println(c);
>>>>>>> }
>>>>>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet
>>>>>>>
>>>>>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code:
>>>>>>>>
>>>>>>>> DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
>>>>>>>> centroids.print();
>>>>>>>> Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
>>>>>>>> Collection<Centroid> testCentroids = Lists.newArrayList(iter);
>>>>>>>> for(Centroid c: testCentroids){
>>>>>>>> System.out.println(c);
>>>>>>>> }
>>>>>>>>
>>>>>>>> The above centroids.print() gives the following output in console:
>>>>>>>>
>>>>>>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>>>>>>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>>>>>>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>>>>>>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>>>>>>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>>>>>>>
>>>>>>>> But the next System.out.println(c) within the for loop prints nothing. What could be the problem.
>>>>>>>>
>>>>>>>> My maven has following configuration for dataStreamUtils:
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.flink</groupId>
>>>>>>>> <artifactId>flink-streaming-contrib_2.10</artifactId>
>>>>>>>> <version>${flink.version}</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Subash Basnet
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>