RichInputFormat working differently in eclipse and in flink cluster

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

RichInputFormat working differently in eclipse and in flink cluster

Teena Kappen // BPRISE

Hi all,

 

I have implemented RichInputFormat for reading result of aggregation queries in Elasticsearch. There are around 100000 buckets, which are of type json array. Note: This is one time response.

 

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

 

public void configure(Configuration parameters) {

System.out.println("configure");

}

 

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {

}

 

public ResponseInputSplit[] createInputSplits(int minNumSplits){

System.out.println("createInputSplits");

 

//read from elastic

// add buckets to array

}

 

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] inputSplits) {

//this is default

System.out.println("getInputSplitAssigner");

return new DefaultInputSplitAssigner(inputSplits);

}

 

public void open(ResponseInputSplit split) {

//read buckets

}

 

public boolean reachedEnd(){

System.out.println("reachedEnd");

}

 

public Bounce nextRecord(Bounce reuse) {

}

 

public void close(){

}

 

// my main method,

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();                             

DataSet<Bounce> bounce_data_set = env.createInput(new MyInputDataSetInputFormat());

 

When running in eclipse, it executes createInputSplits and the results look fine. Logs are given below.

Output is à

configure

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...

configure

createInputSplits

 

When submitting job in flink cluster, it doesn’t execute ‘configure’ and ‘createInputSplits’ methods. Instead it directly goes to nextRecord function. Logs are given below.

Output is à

Starting execution of program

configure

Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job completion.

Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx /user/jobmanager#1219973491] with leader session id...

10/26/2018 15:05:57     Job execution switched to status RUNNING.

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED

java.lang.NullPointerException

                               at com.xxx.test. MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

 

Regards,

Teena

 

Reply | Threaded
Open this post in threaded view
|

Re: RichInputFormat working differently in eclipse and in flink cluster

Till Rohrmann
Hi Teena,

which Flink version are you using? Have you tried whether this happens with the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <[hidden email]> wrote:

Hi all,

 

I have implemented RichInputFormat for reading result of aggregation queries in Elasticsearch. There are around 100000 buckets, which are of type json array. Note: This is one time response.

 

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

 

public void configure(Configuration parameters) {

System.out.println("configure");

}

 

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {

}

 

public ResponseInputSplit[] createInputSplits(int minNumSplits){

System.out.println("createInputSplits");

 

//read from elastic

// add buckets to array

}

 

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] inputSplits) {

//this is default

System.out.println("getInputSplitAssigner");

return new DefaultInputSplitAssigner(inputSplits);

}

 

public void open(ResponseInputSplit split) {

//read buckets

}

 

public boolean reachedEnd(){

System.out.println("reachedEnd");

}

 

public Bounce nextRecord(Bounce reuse) {

}

 

public void close(){

}

 

// my main method,

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();                             

DataSet<Bounce> bounce_data_set = env.createInput(new MyInputDataSetInputFormat());

 

When running in eclipse, it executes createInputSplits and the results look fine. Logs are given below.

Output is à

configure

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...

configure

createInputSplits

 

When submitting job in flink cluster, it doesn’t execute ‘configure’ and ‘createInputSplits’ methods. Instead it directly goes to nextRecord function. Logs are given below.

Output is à

Starting execution of program

configure

Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job completion.

Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx /user/jobmanager#1219973491] with leader session id...

10/26/2018 15:05:57     Job execution switched to status RUNNING.

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED

java.lang.NullPointerException

                               at com.xxx.test. MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

 

Regards,

Teena

 

Reply | Threaded
Open this post in threaded view
|

RE: RichInputFormat working differently in eclipse and in flink cluster

Teena Kappen // BPRISE

Hi Till,

 

We are using 1.4.0. We have not tried this any other releases.

 

We will try this on 1.6.2 and see what happens.

 

Thank you.

 

Regards,

Teena

 

From: Till Rohrmann <[hidden email]>
Sent: 07 November 2018 20:23
To: Teena Kappen // BPRISE <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: RichInputFormat working differently in eclipse and in flink cluster

 

Hi Teena,

 

which Flink version are you using? Have you tried whether this happens with the latest release 1.6.2 as well?

Cheers,

Till

 

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <[hidden email]> wrote:

Hi all,

 

I have implemented RichInputFormat for reading result of aggregation queries in Elasticsearch. There are around 100000 buckets, which are of type json array. Note: This is one time response.

 

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

 

public void configure(Configuration parameters) {

System.out.println("configure");

}

 

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {

}

 

public ResponseInputSplit[] createInputSplits(int minNumSplits){

System.out.println("createInputSplits");

 

//read from elastic

// add buckets to array

}

 

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] inputSplits) {

//this is default

System.out.println("getInputSplitAssigner");

return new DefaultInputSplitAssigner(inputSplits);

}

 

public void open(ResponseInputSplit split) {

//read buckets

}

 

public boolean reachedEnd(){

System.out.println("reachedEnd");

}

 

public Bounce nextRecord(Bounce reuse) {

}

 

public void close(){

}

 

// my main method,

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();                             

DataSet<Bounce> bounce_data_set = env.createInput(new MyInputDataSetInputFormat());

 

When running in eclipse, it executes createInputSplits and the results look fine. Logs are given below.

Output is à

configure

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...

configure

createInputSplits

 

When submitting job in flink cluster, it doesn’t execute ‘configure’ and ‘createInputSplits’ methods. Instead it directly goes to nextRecord function. Logs are given below.

Output is à

Starting execution of program

configure

Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job completion.

Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx /user/jobmanager#1219973491] with leader session id...

10/26/2018 15:05:57     Job execution switched to status RUNNING.

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED

10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING

10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED

java.lang.NullPointerException

                               at com.xxx.test. MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

 

Regards,

Teena