From: "Stefano Bortoli" <[hidden email]>
To: "user" <[hidden email]>
Sent: Wednesday, November 26, 2014 8:37:59 AM
Subject: Re: Program crashes trying to read JSON filesaluti,Keep in mind that this will output your tuple dataset. Therefore, if you want to shape your output differently, It may be necessary to have further processing.You can output your results in different ways. If all you need is to write a file, I normally use the writeAsText method (however, there is the writeAsCSV, writeAsFormattedText. Of write according to your custom FileOutputFormat.datasetToPrint.writeAsText("/path/to/file/with/permission", WriteMode.OVERWRITE);
Stefano2014-11-25 22:04 GMT+01:00 Anirvan BASU <[hidden email]>:Thanks to Aljoscha and Stefano for pointing out the flaw.We corrected the issue as follows:[CODE]import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple4<Integer, String, String, Integer>> counts =
// split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event,count)
text.flatMap(new SelectDataFlatMap())
// group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer - value 1)
.groupBy(1)
.sum(3);
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
} else {
counts.print();
}
// execute program
env.execute("Weblogs Programme");
}...public static class SelectDataFlatMap extends
JSONParseFlatMap<String, Tuple4<Integer, String, String, Integer>> {private static final long serialVersionUID = 1L;@Override
public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>> record)
throws Exception {
try {
record.collect(new Tuple4<Integer, String, String, Integer>(
getInt(value, "timestamp"),
getString(value, "uuid"),
getString(value, "event"),
1));
} catch (JSONException e) {
System.err.println("Field not found");
}
}
}[/CODE]However, this time the issue was different.The programme executed correctly till status FINISHED.However, there was no output :-((i.e. For each Task Manager, an empty file is written.When we checked further about the input text file that is read using env.readTextFile() we find that instead of a text string (full text dataset) only a small string is written!Worse still ! this string value sometimes remains the same over multiple runs of the programme ....Is this natural ? Is this just the handle to the file or the dataset ?Is the Collector() working correctly also ?Note :The actual JSON file (i.e. the text file that should be read) is of the following nature, with a 2-level hierarchy for one field:[JSON]{timestamp: <a href="callto:1397731764" target="_blank" data-mce-href="callto:1397731764">1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}}{timestamp: <a href="callto:1397731765" target="_blank" data-mce-href="callto:1397731765">1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}}[/JSON]So now again, we are confused if we are doing it correctly :-((Thanks in advance for helping us to understand where we are going wrong.AnirvanFrom: "Stefano Bortoli" <[hidden email]>
To: "user" <[hidden email]>
Cc: [hidden email]
Sent: Tuesday, November 25, 2014 5:05:34 PM
Subject: Re: Program crashes trying to read JSON fileVery quickly, it seems you are trying to sum on StringsCaused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.Check your tuple types and be sure that you are not summing on strings.2014-11-25 16:55 GMT+01:00 Anirvan BASU <[hidden email]>:Hello all,We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink (3-tuple based) dataset, then performing some operations.We encountered the following runtime error:[QUOTE]Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
at org.apache.flink.client.program.Client.run(Client.java:244)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
... 6 more
[/QUOTE]The code snippet that could have caused this error (i.e. that we edited) is the following[CODE]import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
return;
}
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = getTextDataSet(env);
DataSet<Tuple3<Integer, String, String>> counts =
// split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event)
text.flatMap(new SelectDataFlatMap())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(2)
.sum(2);// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
} else {
counts.print();
}
// execute program
env.execute("Weblogs Programme");
}...public static class SelectDataFlatMap extends
JSONParseFlatMap<String, Tuple3<Integer, String, String>> {@Override
public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out)
throws Exception {
try {
out.collect(new Tuple3<Integer, String, String>(
getInt(value, "timestamp"),
getString(value, "uuid"),
getString(value, "event")));
} catch (JSONException e) {
System.err.println("Field not found");
}
}
}
[/CODE][QUOTE]Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
at org.apache.flink.client.program.Client.run(Client.java:244)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
... 6 more
[/QUOTE]The JSON file is of the following nature, with a 2-level hierarchy for one field:[JSON]{timestamp: <a href="callto:1397731764" target="_blank" data-mce-href="callto:1397731764">1397731764 payload: {product: Younited uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd platform: native version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241 type: can-usage-v1 event: General,Login,Success}}{timestamp: <a href="callto:1397731765" target="_blank" data-mce-href="callto:1397731765">1397731765 payload: {product: Younited uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e platform: native version: 7b4b767060b62537b63c5d10d911870a14d2b84e type: can-usage-v1 event: General,App,Opened}}[/JSON]Thanks in advance for helping us to understand where we are going wrong.
Anirvan
Free forum by Nabble | Edit this page |