Program crashes trying to read JSON file

Posted by Anirvan BASU on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Program-crashes-trying-to-read-JSON-file-tp463.html

Hello all,

We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink  (3-tuple based) dataset, then performing some operations.

We encountered the following runtime error:

[QUOTE]
Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
    at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
    at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
    at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
    at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more
[/QUOTE]



The code snippet that could have caused this error (i.e. that we edited) is the following

[CODE]

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...

    public static void main(String[] args) throws Exception {
        
        if(!parseParameters(args)) {
            return;
        }
        
        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // get input data
        DataSet<String> text = getTextDataSet(env);
        
        DataSet<Tuple3<Integer, String, String>> counts =
                // split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event)
                text.flatMap(new SelectDataFlatMap())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(2)
                .sum(2);

        // emit result
        if(fileOutput) {
            counts.writeAsCsv(outputPath, "\n", " ");
        } else {
            counts.print();
        }
        
        // execute program
        env.execute("Weblogs Programme");
    }

...

    public static class SelectDataFlatMap extends
    JSONParseFlatMap<String, Tuple3<Integer, String, String>> {

        @Override
        public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out)
                throws Exception {
            try {
                out.collect(new Tuple3<Integer, String, String>(
                        getInt(value, "timestamp"),
                        getString(value, "uuid"),
                        getString(value, "event")));
            } catch (JSONException e) {
                System.err.println("Field not found");
            }
        }
    }
       
[/CODE]



[QUOTE]
Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
    at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
    at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
    at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
    at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more
[/QUOTE]


The JSON file is of the following nature, with a 2-level hierarchy for one field:
[JSON]
{timestamp: <a href="callto:1397731764" data-mce-href="callto:1397731764">1397731764     payload: {product: Younited     uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd     platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241     type: can-usage-v1     event: General,Login,Success}}
{timestamp: <a href="callto:1397731765" data-mce-href="callto:1397731765">1397731765     payload: {product: Younited     uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e     platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e     type: can-usage-v1     event: General,App,Opened}}
[/JSON]



Thanks in advance for helping us to understand where we are going wrong.

Anirvan