error when eun program left outer join

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

error when eun program left outer join

hagersaleh
This post was updated on .
CONTENTS DELETED
The author has deleted this message.
Reply | Threaded
Open this post in threaded view
|

Re: error when eun program left outer join

rmetzger0
Hi,

what data are you using?

The exception says "NullFieldException: Field 1 is null, but expected to hold a value.". Maybe the data is not in the right format?

On Mon, Apr 27, 2015 at 2:32 PM, hagersaleh <[hidden email]> wrote:
I want implement left outer join from two dataset i use Tuple data type


package org.apache.flink.examples.java.relational;


import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.io.File;

@SuppressWarnings("serial")
public class TPCHQuery3 {

//filed name in cutomer table

public static class LeftOuterJoin implements
CoGroupFunction<Tuple2<Tuple1<Integer>, String>,
Tuple2<Tuple1<Integer>, String>,
Tuple2<Tuple1<Integer>,Tuple1<Integer>>> {

    @Override
    public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>>
leftElements,
                        Iterable<Tuple2<Tuple1<Integer>, String>>
rightElements,

Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws
Exception {



            for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements)
{
                    boolean hadElements = false;
                    for (Tuple2<Tuple1<Integer>, String> rightElem :
rightElements) {
                            out.collect(new
Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
                            hadElements = true;
                    }
                    if (!hadElements) {
                            out.collect(new Tuple2<Tuple1<Integer>,
Tuple1<Integer>>(leftElem.f0, null));
                    }
            }

    }
  }

public static void main(String[] args) throws Exception {


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1<Integer>>
leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
                                        .fieldDelimiter('|')

.includeFields("10000000").ignoreFirstLine()
                                        .types(Integer.class);

   // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
    DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map(
        new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
String>>() {
                @Override
                public Tuple2<Tuple1<Integer>, String>
map(Tuple1<Integer> x) throws Exception {
                        return new Tuple2<Tuple1<Integer>, String>(x,
"some data");
                }
        });
DataSet<Tuple1<Integer>>
rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
                                        .fieldDelimiter('|')

.includeFields("010000000").ignoreFirstLine()
                                        .types(Integer.class);
   // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8, 9,
10);
    DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 =
rightSide.map(
        new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
String>>() {
                @Override
                public Tuple2<Tuple1<Integer>, String>
map(Tuple1<Integer> x) throws Exception {
                        return new Tuple2<Tuple1<Integer>, String>(x,
"some other data");
                }
        });
    DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin =
leftSide2.coGroup(rightSide2)
            .where(0)
            .equalTo(0)
            .with(new LeftOuterJoin());

    leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
"\n", "|");;
    env.execute();

}


Error code After run programs
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
hold a value.
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
    at
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
    at
org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
    at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
    at
org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
    at
org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
    at java.lang.Thread.run(Thread.java:724)

    at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
    at
org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: error when eun program left outer join

hagersaleh
implement left outer join from two dataset Customer and Orders
using Tuple data type
Reply | Threaded
Open this post in threaded view
|

Re: error when eun program left outer join

hagersaleh
I solve mu problem very thanks