From: "Stefano Bortoli" <[hidden email]>
To: "user" <[hidden email]>
Cc: [hidden email]
Sent: Tuesday, November 25, 2014 5:05:34 PM
Subject: Re: Program crashes trying to read JSON fileVery quickly, it seems you are trying to sum on StringsCaused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.Check your tuple types and be sure that you are not summing on strings.2014-11-25 16:55 GMT+01:00 Anirvan BASU <[hidden email]>:Hello all,We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink (3-tuple based) dataset, then performing some operations.We encountered the following runtime error:[QUOTE]Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
at org.apache.flink.client.program.Client.run(Client.java:244)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
... 6 more
[/QUOTE]The code snippet that could have caused this error (i.e. that we edited) is the following[CODE]import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple3<Integer, String, String>> counts =
// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event)
text.flatMap(new SelectDataFlatMap())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(2)
.sum(2);// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
} else {
counts.print();
}
// execute program
env.execute("Weblogs Programme");
}...public static class SelectDataFlatMap extends
JSONParseFlatMap<String, Tuple3<Integer, String, String>> {@Override
public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out)
throws Exception {
try {
out.collect(new Tuple3<Integer, String, String>(
getInt(value, "timestamp"),
getString(value, "uuid"),
getString(value, "event")));
} catch (JSONException e) {
System.err.println("Field not found");
}
}
}
[/CODE][QUOTE]Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
at org.apache.flink.client.program.Client.run(Client.java:244)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
... 6 more
[/QUOTE]The JSON file is of the following nature, with a 2-level hierarchy for one field:[JSON]{timestamp: <a href="callto:1397731764" target="_blank" data-mce-href="callto:1397731764">1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}}{timestamp: <a href="callto:1397731765" target="_blank" data-mce-href="callto:1397731765">1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}}[/JSON]Thanks in advance for helping us to understand where we are going wrong.
Anirvan
Free forum by Nabble | Edit this page |