Hi guys,
I'm developing a dynamic Flink program composer, which receive a dataflow from a client and convert it into Flink code. I have tried to compile a test Flink program with Janino, but it fails, the error I receive is: org.codehaus.commons.compiler.CompileException: Line 66, Column 0: Non-abstract class "FlinkExecutor$1" must implement method "public abstract java.lang.Object org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object) throws java.lang.Exception" It seems Janino doesn't recognize the MapFunction. If I put this code into a java file and I execute it with Eclipse, everything works good. Here the code I used: package Test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import com.Flink.Operators.Source; public class FlinkExecutor { public static class RainPOJO { private String altitude; private String city_name; private String latitude; private String longitude; private String rainfall; private String station_name; private String time; public String getAltitude() { return altitude; } public void setAltitude(String Altitude) { this.altitude = Altitude; } public String getCity_name() { return city_name; } public void setCity_name(String City_name) { this.city_name = City_name; } public String getLatitude() { return latitude; } public void setLatitude(String Latitude) { this.latitude = Latitude; } public String getLongitude() { return longitude; } public void setLongitude(String Longitude) { this.longitude = Longitude; } public String getRainfall() { return rainfall; } public void setRainfall(String Rainfall) { this.rainfall = Rainfall; } public String getStation_name() { return station_name; } public void setStation_name(String Station_name) { this.station_name = Station_name; } public String getTime() { return time; } public void setTime(String Time) { this.time = Time; } } public FlinkExecutor() {} public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setDegreeOfParallelism(1); Source Source = new Source("sensor", "rain"); String path_Source = Source.getCSVPath(); DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source) .ignoreFirstLine() .pojoType(RainPOJO.class, "altitude", "city_name", "latitude", "longitude", "rainfall", "station_name", "time"); long size = ds_s1.count(); long startTime = System.currentTimeMillis(); ds_s1.map(new MapFunction < RainPOJO, String > () { int count = 0;@Override public String map(RainPOJO obj) throws Exception { count += 1; long endTime = System.currentTimeMillis(); double elapsed_time = endTime - startTime; if (count == size) { double d_seconds = elapsed_time / 1000; return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + " seconds"; } return " " + count; } }) .print(); } } |
I’m not a Janino expert but it might be related to the fact that Janino not fully supports generic types (see http://unkrig.de/w/Janino under limitations). Maybe it works of you use the untyped Cheers, On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licari <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |