Unable to understand datastream error message

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to understand datastream error message

subashbasnet
Hello all,

I am getting the below error on execute of StreamExecutionEnvironment. 

Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.

The run method inside the thread class of DataStreamUtils handles this exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I could solve it. 


Best Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: Unable to understand datastream error message

Aljoscha Krettek
Could you please post your code.

On Sat, 7 May 2016 at 19:16 subash basnet <[hidden email]> wrote:
Hello all,

I am getting the below error on execute of StreamExecutionEnvironment. 

Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.

The run method inside the thread class of DataStreamUtils handles this exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I could solve it. 


Best Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: Unable to understand datastream error message

subashbasnet
In reply to this post by subashbasnet
Hello Aljoscha,

Below is the shorted version of StockAnalysis class which is a datastream adapation of the KMeans.java dataset code. 

public class StockAnalysis{
    public static void main(String args[]){
       DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
  loop = centroids.iterate(10);
  DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
      public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
Iterator<Centroid> iter = DataStreamUtils.collect(loop);
this.centroids = Lists.newArrayList(iter);
}
               @Override
public Tuple2<String, Point> map(Point p) throws Exception {
                     for (Centroid centroid : centroids) {
                     }...................
                }
     }
   }
     
}


On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Could you please post your code.

On Sat, 7 May 2016 at 19:16 subash basnet <[hidden email]> wrote:
Hello all,

I am getting the below error on execute of StreamExecutionEnvironment. 

Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.

The run method inside the thread class of DataStreamUtils handles this exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I could solve it. 


Best Regards,
Subash Basnet


Reply | Threaded
Open this post in threaded view
|

Re: Unable to understand datastream error message

subashbasnet
Hello, 

I had to use, 
private static IterativeStream<Centroid> loop
loop as global variable because I cannot broadcast it like that of DataSet API in DataStream API. 

I tried to use closewith  like that of DataSet as below in DataStream but it gives me exception:
DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids);


Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration.
at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysis.main(StockAnalysis.java:64)


Best Regards,
Subash Basnet


On Sat, May 14, 2016 at 4:26 PM, subash basnet <[hidden email]> wrote:
Hello Aljoscha,

Below is the shorted version of StockAnalysis class which is a datastream adapation of the KMeans.java dataset code. 

public class StockAnalysis{
    public static void main(String args[]){
       DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
  loop = centroids.iterate(10);
  DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
      public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
Iterator<Centroid> iter = DataStreamUtils.collect(loop);
this.centroids = Lists.newArrayList(iter);
}
               @Override
public Tuple2<String, Point> map(Point p) throws Exception {
                     for (Centroid centroid : centroids) {
                     }...................
                }
     }
   }
     
}


On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Could you please post your code.

On Sat, 7 May 2016 at 19:16 subash basnet <[hidden email]> wrote:
Hello all,

I am getting the below error on execute of StreamExecutionEnvironment. 

Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.

The run method inside the thread class of DataStreamUtils handles this exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I could solve it. 


Best Regards,
Subash Basnet



Reply | Threaded
Open this post in threaded view
|

Re: Unable to understand datastream error message

Aljoscha Krettek
There you have your explanation. A loop actually has to be a loop for it to work in Flink.

On Sat, 14 May 2016 at 16:35 subash basnet <[hidden email]> wrote:
Hello, 

I had to use, 
private static IterativeStream<Centroid> loop
loop as global variable because I cannot broadcast it like that of DataSet API in DataStream API. 

I tried to use closewith  like that of DataSet as below in DataStream but it gives me exception:
DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids);


Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration.
at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysis.main(StockAnalysis.java:64)


Best Regards,
Subash Basnet


On Sat, May 14, 2016 at 4:26 PM, subash basnet <[hidden email]> wrote:
Hello Aljoscha,

Below is the shorted version of StockAnalysis class which is a datastream adapation of the KMeans.java dataset code. 

public class StockAnalysis{
    public static void main(String args[]){
       DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
  loop = centroids.iterate(10);
  DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
      public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
Iterator<Centroid> iter = DataStreamUtils.collect(loop);
this.centroids = Lists.newArrayList(iter);
}
               @Override
public Tuple2<String, Point> map(Point p) throws Exception {
                     for (Centroid centroid : centroids) {
                     }...................
                }
     }
   }
     
}


On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Could you please post your code.

On Sat, 7 May 2016 at 19:16 subash basnet <[hidden email]> wrote:
Hello all,

I am getting the below error on execute of StreamExecutionEnvironment. 

Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.

The run method inside the thread class of DataStreamUtils handles this exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I could solve it. 


Best Regards,
Subash Basnet



Reply | Threaded
Open this post in threaded view
|

Re: Unable to understand datastream error message

subashbasnet
In reply to this post by subashbasnet
Hello Aljoscha,

For DataSet: 
IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
DataSet<Centroid> newCentroids = points.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);


It's working fine, now if I do the same operation in DataStream as below
IterativeDataStream<Centroid>loop = centroids.iterate(numIterations);
DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

I get the following exception as already mentioned in earlier thread:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration.
at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysisKMeansOutlierDetection.main(StockAnalysisKMeansOutlierDetection.java:98)

Could you please suggest me where I am wrong here? 


Best Regards,
Subash Basnet


On Mon, May 16, 2016 at 6:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

There you have your explanation. A loop actually has to be a loop for it to work in Flink.

On Sat, 14 May 2016 at 16:35 subash basnet <[hidden email]> wrote:
Hello, 

I had to use, 
private static IterativeStream<Centroid> loop
loop as global variable because I cannot broadcast it like that of DataSet API in DataStream API. 

I tried to use closewith  like that of DataSet as below in DataStream but it gives me exception:
DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids);


Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration.
at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysis.main(StockAnalysis.java:64)


Best Regards,
Subash Basnet


On Sat, May 14, 2016 at 4:26 PM, subash basnet <[hidden email]> wrote:
Hello Aljoscha,

Below is the shorted version of StockAnalysis class which is a datastream adapation of the KMeans.java dataset code. 

public class StockAnalysis{
    public static void main(String args[]){
       DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
  loop = centroids.iterate(10);
  DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
      public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
Iterator<Centroid> iter = DataStreamUtils.collect(loop);
this.centroids = Lists.newArrayList(iter);
}
               @Override
public Tuple2<String, Point> map(Point p) throws Exception {
                     for (Centroid centroid : centroids) {
                     }...................
                }
     }
   }
     
}


On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Could you please post your code.

On Sat, 7 May 2016 at 19:16 subash basnet <[hidden email]> wrote:
Hello all,

I am getting the below error on execute of StreamExecutionEnvironment. 

Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges.

The run method inside the thread class of DataStreamUtils handles this exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I could solve it. 


Best Regards,
Subash Basnet