Re: Program crashes trying to read JSON file

Posted by Anirvan BASU on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Program-crashes-trying-to-read-JSON-file-tp463p473.html

Thanks to Aljoscha and Stefano for pointing out the flaw.

We corrected the issue as follows:

[CODE]

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...

    public static void main(String[] args) throws Exception {
        
        if(!parseParameters(args)) {
            return;
        }
        
        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // get input data
        DataSet<String> text = getTextDataSet(env);
        
        DataSet<Tuple4<Integer, String, String, Integer>> counts =
                // split up the lines in pairs (4-tuples) containing: (timestamp,uuid,event,count)
                text.flatMap(new SelectDataFlatMap())
                // group by the tuple field "1" (an event - string) and sum up tuple field "3" (integer - value 1)
                .groupBy(1)
                .sum(3);


        // emit result
        if(fileOutput) {
            counts.writeAsCsv(outputPath, "\n", " ");
        } else {
            counts.print();
        }
        
        // execute program
        env.execute("Weblogs Programme");
    }

...

    public static class SelectDataFlatMap extends
    JSONParseFlatMap<String, Tuple4<Integer, String, String, Integer>> {

        private static final long serialVersionUID = 1L;

        @Override
        public void flatMap(String value, Collector<Tuple4<Integer, String, String, Integer>> record)
                throws Exception {
            try {
                record.collect(new Tuple4<Integer, String, String, Integer>(
                        getInt(value, "timestamp"),
                        getString(value, "uuid"),
                        getString(value, "event"),
                        1));
            } catch (JSONException e) {
                System.err.println("Field not found");
            }
        }
    }

      
[/CODE]

However, this time the issue was different.
The programme executed correctly till status FINISHED.
However, there was no output :-((
i.e. For each Task Manager, an empty file is written.

When we checked further about the input text file that is read using env.readTextFile() we find that instead of a text string (full text dataset) only a small string is written!
Something as :
[hidden email]

Worse still ! this string value sometimes remains the same over multiple runs of the programme ....
Is this natural ? Is this just the handle to the file or the dataset ?
Is the Collector() working correctly also ?


Note :
The actual JSON file (i.e. the text file that should be read) is of the following nature, with a 2-level hierarchy for one field:
[JSON]
{timestamp: <a href="callto:1397731764" target="_blank" data-mce-href="callto:1397731764">1397731764     payload: {product: Younited     uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd     platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241     type: can-usage-v1     event: General,Login,Success}}
{timestamp: <a href="callto:1397731765" target="_blank" data-mce-href="callto:1397731765">1397731765     payload: {product: Younited     uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e     platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e     type: can-usage-v1     event: General,App,Opened}}
[/JSON]


So now again, we are confused if we are doing it correctly :-((

Thanks in advance for helping us to understand where we are going wrong.
Anirvan


From: "Stefano Bortoli" <[hidden email]>
To: "user" <[hidden email]>
Cc: [hidden email]
Sent: Tuesday, November 25, 2014 5:05:34 PM
Subject: Re: Program crashes trying to read JSON file

Very quickly, it seems you are trying to sum on Strings

Caused by: org.apache.flink.api.java.
aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.

Check your tuple types and be sure that you are not summing on strings.


2014-11-25 16:55 GMT+01:00 Anirvan BASU <[hidden email]>:
Hello all,

We are using Flink 0.7 and trying to read a large JSON file, reading some fields into a flink  (3-tuple based) dataset, then performing some operations.

We encountered the following runtime error:

[QUOTE]
Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
    at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
    at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
    at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
    at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more
[/QUOTE]



The code snippet that could have caused this error (i.e. that we edited) is the following

[CODE]

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.sling.commons.json.JSONException;
...

    public static void main(String[] args) throws Exception {
        
        if(!parseParameters(args)) {
            return;
        }
        
        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // get input data
        DataSet<String> text = getTextDataSet(env);
        
        DataSet<Tuple3<Integer, String, String>> counts =
                // split up the lines in pairs (3-tuples) containing: (timestamp,uuid,event)
                text.flatMap(new SelectDataFlatMap())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(2)
                .sum(2);

        // emit result
        if(fileOutput) {
            counts.writeAsCsv(outputPath, "\n", " ");
        } else {
            counts.print();
        }
        
        // execute program
        env.execute("Weblogs Programme");
    }

...

    public static class SelectDataFlatMap extends
    JSONParseFlatMap<String, Tuple3<Integer, String, String>> {

        @Override
        public void flatMap(String value, Collector<Tuple3<Integer, String, String>> out)
                throws Exception {
            try {
                out.collect(new Tuple3<Integer, String, String>(
                        getInt(value, "timestamp"),
                        getString(value, "uuid"),
                        getString(value, "event")));
            } catch (JSONException e) {
                System.err.println("Field not found");
            }
        }
    }
       
[/CODE]



[QUOTE]
Error: The main method caused an error.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:404)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:307)
    at org.apache.flink.client.program.Client.run(Client.java:244)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025)
Caused by: org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException: The type java.lang.String has currently not supported for built-in sum aggregations.
    at org.apache.flink.api.java.aggregation.SumAggregationFunction$SumAggregationFunctionFactory.createAggregationFunction(SumAggregationFunction.java:186)
    at org.apache.flink.api.java.operators.AggregateOperator.<init>(AggregateOperator.java:109)
    at org.apache.flink.api.java.operators.UnsortedGrouping.aggregate(UnsortedGrouping.java:61)
    at org.apache.flink.api.java.operators.UnsortedGrouping.sum(UnsortedGrouping.java:72)
    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:389)
    ... 6 more
[/QUOTE]


The JSON file is of the following nature, with a 2-level hierarchy for one field:
[JSON]
{timestamp: <a href="callto:1397731764" target="_blank" data-mce-href="callto:1397731764">1397731764     payload: {product: Younited     uuid: 754726549cec3968a60ffffa2f8ccdc1da27e57a01af4bb2b11841803a25eadd     platform: native     version: 6aa54aca95fb1b8ef2290136ab12df2e4b011241     type: can-usage-v1     event: General,Login,Success}}
{timestamp: <a href="callto:1397731765" target="_blank" data-mce-href="callto:1397731765">1397731765     payload: {product: Younited     uuid: e0b3dad557ca77dc035fd22d1e8608c4248526ab9318a85637dbf88228a4034e     platform: native     version: 7b4b767060b62537b63c5d10d911870a14d2b84e     type: can-usage-v1     event: General,App,Opened}}
[/JSON]



Thanks in advance for helping us to understand where we are going wrong.

Anirvan