Flink program compiled with Janino fails

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink program compiled with Janino fails

Giacomo Licari
Hi guys,
I'm developing a dynamic Flink program composer, which receive a dataflow from a client and convert it into Flink code.

I have tried to compile a test Flink program with Janino, but it fails, the error I receive is:
org.codehaus.commons.compiler.CompileException: Line 66, Column 0: Non-abstract class "FlinkExecutor$1" must implement method "public abstract java.lang.Object org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object) throws java.lang.Exception"

It seems Janino doesn't recognize the MapFunction.

If I put this code into a java file and I execute it with Eclipse, everything works good.

Here the code I used:

package Test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import com.Flink.Operators.Source;

public class FlinkExecutor {
public static class RainPOJO {
private String altitude;
private String city_name;
private String latitude;
private String longitude;
private String rainfall;
private String station_name;
private String time;
public String getAltitude() {
return altitude;
}
public void setAltitude(String Altitude) {
this.altitude = Altitude;
}
public String getCity_name() {
return city_name;
}
public void setCity_name(String City_name) {
this.city_name = City_name;
}
public String getLatitude() {
return latitude;
}
public void setLatitude(String Latitude) {
this.latitude = Latitude;
}
public String getLongitude() {
return longitude;
}
public void setLongitude(String Longitude) {
this.longitude = Longitude;
}
public String getRainfall() {
return rainfall;
}
public void setRainfall(String Rainfall) {
this.rainfall = Rainfall;
}
public String getStation_name() {
return station_name;
}
public void setStation_name(String Station_name) {
this.station_name = Station_name;
}
public String getTime() {
return time;
}
public void setTime(String Time) {
this.time = Time;
}
}
public FlinkExecutor() {}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
Source Source = new Source("sensor", "rain");
String path_Source = Source.getCSVPath();
DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source)
.ignoreFirstLine()
.pojoType(RainPOJO.class, "altitude", "city_name", "latitude", "longitude", "rainfall", "station_name", "time");
long size = ds_s1.count();
long startTime = System.currentTimeMillis();
ds_s1.map(new MapFunction < RainPOJO, String > () {
int count = 0;@Override
public String map(RainPOJO obj) throws Exception {
count += 1;
long endTime = System.currentTimeMillis();
double elapsed_time = endTime - startTime;
if (count == size) {
double d_seconds = elapsed_time / 1000;
return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + " seconds";
}
return " " + count;
}
})
.print();
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink program compiled with Janino fails

Till Rohrmann

I’m not a Janino expert but it might be related to the fact that Janino not fully supports generic types (see http://unkrig.de/w/Janino under limitations). Maybe it works of you use the untyped MapFunction type.

Cheers,
Till


On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licari <[hidden email]> wrote:
Hi guys,
I'm developing a dynamic Flink program composer, which receive a dataflow from a client and convert it into Flink code.

I have tried to compile a test Flink program with Janino, but it fails, the error I receive is:
org.codehaus.commons.compiler.CompileException: Line 66, Column 0: Non-abstract class "FlinkExecutor$1" must implement method "public abstract java.lang.Object org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object) throws java.lang.Exception"

It seems Janino doesn't recognize the MapFunction.

If I put this code into a java file and I execute it with Eclipse, everything works good.

Here the code I used:

package Test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import com.Flink.Operators.Source;

public class FlinkExecutor {
public static class RainPOJO {
private String altitude;
private String city_name;
private String latitude;
private String longitude;
private String rainfall;
private String station_name;
private String time;
public String getAltitude() {
return altitude;
}
public void setAltitude(String Altitude) {
this.altitude = Altitude;
}
public String getCity_name() {
return city_name;
}
public void setCity_name(String City_name) {
this.city_name = City_name;
}
public String getLatitude() {
return latitude;
}
public void setLatitude(String Latitude) {
this.latitude = Latitude;
}
public String getLongitude() {
return longitude;
}
public void setLongitude(String Longitude) {
this.longitude = Longitude;
}
public String getRainfall() {
return rainfall;
}
public void setRainfall(String Rainfall) {
this.rainfall = Rainfall;
}
public String getStation_name() {
return station_name;
}
public void setStation_name(String Station_name) {
this.station_name = Station_name;
}
public String getTime() {
return time;
}
public void setTime(String Time) {
this.time = Time;
}
}
public FlinkExecutor() {}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
Source Source = new Source("sensor", "rain");
String path_Source = Source.getCSVPath();
DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source)
.ignoreFirstLine()
.pojoType(RainPOJO.class, "altitude", "city_name", "latitude", "longitude", "rainfall", "station_name", "time");
long size = ds_s1.count();
long startTime = System.currentTimeMillis();
ds_s1.map(new MapFunction < RainPOJO, String > () {
int count = 0;@Override
public String map(RainPOJO obj) throws Exception {
count += 1;
long endTime = System.currentTimeMillis();
double elapsed_time = endTime - startTime;
if (count == size) {
double d_seconds = elapsed_time / 1000;
return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + " seconds";
}
return " " + count;
}
})
.print();
}
}