coGroup Iterator NoSuchElement

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

coGroup Iterator NoSuchElement

Mustafa Elbehery
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87


Selection_022.png (103K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Mustafa Elbehery
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87

Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Stephan Ewen
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87


Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Mustafa Elbehery
Hi,


public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> {

Person person;

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception {

Iterator<Person> iterator = iterable.iterator();
person = iterator.next();

ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
Iterator<StudentInfo> infosIterator = iterable1.iterator();

while(infosIterator.hasNext())
infos.add(infosIterator.next());

if (infos.size() > 0) {
update(person, infos, collector);
}
}

public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) {
person.setMajor(infos.iterator().next().getMajor());
for(StudentInfo info : infos){
person.getBestCourse().addAll(info.getCourses());
}
collector.collect(person);
}
}

*******************************************************************************************************

public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> {

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception {

Person person = iterable.iterator().next();

ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
for (StudentJobs job : iterable1) {
jobs.add(job);
}
if (jobs.size() > 0) {
update(person, jobs, collector);
}
}

public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) {

for(StudentJobs job : jobs){
person.getJobs().addAll(job.getJobs());
}
collector.collect(person);
}
}

On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87

Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Fabian Hueske-2
Hi,

one of the iterables of a CoGroup function can be empty. Calling iterator.next() on an empty iterator raises the NoSuchElementException.
This is the expected behavior of the function.

Are you sure your assumption about your data are correct, i.e., that the iterator should always have (at least) one element?

Fabian


2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi,


public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> {

Person person;

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception {

Iterator<Person> iterator = iterable.iterator();
person = iterator.next();

ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
Iterator<StudentInfo> infosIterator = iterable1.iterator();

while(infosIterator.hasNext())
infos.add(infosIterator.next());

if (infos.size() > 0) {
update(person, infos, collector);
}
}

public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) {
person.setMajor(infos.iterator().next().getMajor());
for(StudentInfo info : infos){
person.getBestCourse().addAll(info.getCourses());
}
collector.collect(person);
}
}

*******************************************************************************************************

public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> {

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception {

Person person = iterable.iterator().next();

ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
for (StudentJobs job : iterable1) {
jobs.add(job);
}
if (jobs.size() > 0) {
update(person, jobs, collector);
}
}

public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) {

for(StudentJobs job : jobs){
person.getJobs().addAll(job.getJobs());
}
collector.collect(person);
}
}

On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87


Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Mustafa Elbehery
Yes, Its working now .. But my assumption is that I want to join different datasets on the common key, so it will be normal to have many tuples on side, which does not exist on the other side .. 

How to fix that ?!!

On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

one of the iterables of a CoGroup function can be empty. Calling iterator.next() on an empty iterator raises the NoSuchElementException.
This is the expected behavior of the function.

Are you sure your assumption about your data are correct, i.e., that the iterator should always have (at least) one element?

Fabian


2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi,


public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> {

Person person;

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception {

Iterator<Person> iterator = iterable.iterator();
person = iterator.next();

ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
Iterator<StudentInfo> infosIterator = iterable1.iterator();

while(infosIterator.hasNext())
infos.add(infosIterator.next());

if (infos.size() > 0) {
update(person, infos, collector);
}
}

public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) {
person.setMajor(infos.iterator().next().getMajor());
for(StudentInfo info : infos){
person.getBestCourse().addAll(info.getCourses());
}
collector.collect(person);
}
}

*******************************************************************************************************

public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> {

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception {

Person person = iterable.iterator().next();

ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
for (StudentJobs job : iterable1) {
jobs.add(job);
}
if (jobs.size() > 0) {
update(person, jobs, collector);
}
}

public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) {

for(StudentJobs job : jobs){
person.getJobs().addAll(job.getJobs());
}
collector.collect(person);
}
}

On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87

Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Stephan Ewen
The regular JOIN has the semantics of an inner join, filtering out cases where no matching tuple is found on one side.

CoGroup follows the semantics of an outer join on groups, delivering also empty groups on some sides.

On Thu, Jun 4, 2015 at 11:18 AM, Mustafa Elbehery <[hidden email]> wrote:
Yes, Its working now .. But my assumption is that I want to join different datasets on the common key, so it will be normal to have many tuples on side, which does not exist on the other side .. 

How to fix that ?!!

On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

one of the iterables of a CoGroup function can be empty. Calling iterator.next() on an empty iterator raises the NoSuchElementException.
This is the expected behavior of the function.

Are you sure your assumption about your data are correct, i.e., that the iterator should always have (at least) one element?

Fabian


2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi,


public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> {

Person person;

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception {

Iterator<Person> iterator = iterable.iterator();
person = iterator.next();

ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
Iterator<StudentInfo> infosIterator = iterable1.iterator();

while(infosIterator.hasNext())
infos.add(infosIterator.next());

if (infos.size() > 0) {
update(person, infos, collector);
}
}

public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) {
person.setMajor(infos.iterator().next().getMajor());
for(StudentInfo info : infos){
person.getBestCourse().addAll(info.getCourses());
}
collector.collect(person);
}
}

*******************************************************************************************************

public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> {

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception {

Person person = iterable.iterator().next();

ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
for (StudentJobs job : iterable1) {
jobs.add(job);
}
if (jobs.size() > 0) {
update(person, jobs, collector);
}
}

public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) {

for(StudentJobs job : jobs){
person.getJobs().addAll(job.getJobs());
}
collector.collect(person);
}
}

On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87


Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Fabian Hueske-2
In reply to this post by Mustafa Elbehery
I am not sure if I got your question right.

You can easily prevent the NoSuchElementException, but calling next() only if hasNext() returns true.

2015-06-04 11:18 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Yes, Its working now .. But my assumption is that I want to join different datasets on the common key, so it will be normal to have many tuples on side, which does not exist on the other side .. 

How to fix that ?!!

On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

one of the iterables of a CoGroup function can be empty. Calling iterator.next() on an empty iterator raises the NoSuchElementException.
This is the expected behavior of the function.

Are you sure your assumption about your data are correct, i.e., that the iterator should always have (at least) one element?

Fabian


2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi,


public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> {

Person person;

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception {

Iterator<Person> iterator = iterable.iterator();
person = iterator.next();

ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
Iterator<StudentInfo> infosIterator = iterable1.iterator();

while(infosIterator.hasNext())
infos.add(infosIterator.next());

if (infos.size() > 0) {
update(person, infos, collector);
}
}

public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) {
person.setMajor(infos.iterator().next().getMajor());
for(StudentInfo info : infos){
person.getBestCourse().addAll(info.getCourses());
}
collector.collect(person);
}
}

*******************************************************************************************************

public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> {

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception {

Person person = iterable.iterator().next();

ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
for (StudentJobs job : iterable1) {
jobs.add(job);
}
if (jobs.size() > 0) {
update(person, jobs, collector);
}
}

public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) {

for(StudentJobs job : jobs){
person.getJobs().addAll(job.getJobs());
}
collector.collect(person);
}
}

On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87


Reply | Threaded
Open this post in threaded view
|

Re: coGroup Iterator NoSuchElement

Mustafa Elbehery
@ Stephan, I was trying to follow the concept of Nest Join. In other words, I wanted to follow certain implementation to achieve my goal.

@Fabian, Well, solving the exception this way will lead to incorrect result, as they key will always exist on one side, the iterator of the other side will continue to emit results.

Thanks for your help, I will dig after and check for alternatives. 

On Thu, Jun 4, 2015 at 11:22 AM, Fabian Hueske <[hidden email]> wrote:
I am not sure if I got your question right.

You can easily prevent the NoSuchElementException, but calling next() only if hasNext() returns true.

2015-06-04 11:18 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Yes, Its working now .. But my assumption is that I want to join different datasets on the common key, so it will be normal to have many tuples on side, which does not exist on the other side .. 

How to fix that ?!!

On Thu, Jun 4, 2015 at 11:00 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

one of the iterables of a CoGroup function can be empty. Calling iterator.next() on an empty iterator raises the NoSuchElementException.
This is the expected behavior of the function.

Are you sure your assumption about your data are correct, i.e., that the iterator should always have (at least) one element?

Fabian


2015-06-04 10:47 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi,


public static class ComputeStudiesProfile implements CoGroupFunction<Person, StudentInfo, Person> {

Person person;

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentInfo> iterable1, Collector<Person> collector) throws Exception {

Iterator<Person> iterator = iterable.iterator();
person = iterator.next();

ArrayList<StudentInfo> infos = new ArrayList<StudentInfo>();
Iterator<StudentInfo> infosIterator = iterable1.iterator();

while(infosIterator.hasNext())
infos.add(infosIterator.next());

if (infos.size() > 0) {
update(person, infos, collector);
}
}

public void update(Person person, Collection<StudentInfo> infos, Collector<Person> collector) {
person.setMajor(infos.iterator().next().getMajor());
for(StudentInfo info : infos){
person.getBestCourse().addAll(info.getCourses());
}
collector.collect(person);
}
}

*******************************************************************************************************

public static class ComputeJobsProfile implements CoGroupFunction<Person, StudentJobs, Person> {

@Override
public void coGroup(Iterable<Person> iterable, Iterable<StudentJobs> iterable1, Collector<Person> collector) throws Exception {

Person person = iterable.iterator().next();

ArrayList<StudentJobs> jobs = new ArrayList<StudentJobs>();
for (StudentJobs job : iterable1) {
jobs.add(job);
}
if (jobs.size() > 0) {
update(person, jobs, collector);
}
}

public void update(Person person, Collection<StudentJobs> jobs, Collector<Person> collector) {

for(StudentJobs job : jobs){
person.getJobs().addAll(job.getJobs());
}
collector.collect(person);
}
}

On Wed, Jun 3, 2015 at 11:49 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

The code snippet is not very revealing. Can you also shot the implementations of the CoGroupFunctions?

Thanks!

On Wed, Jun 3, 2015 at 3:50 PM, Mustafa Elbehery <[hidden email]> wrote:
Code Snippet :) 

DataSet<Person> updatedPersonOne = inPerson.coGroup(inStudent)
.where("name").equalTo("name")
.with(new ComputeStudiesProfile());

DataSet<Person> updatedPersonTwo = updatedPersonOne.coGroup(inJobs)
.where("name").equalTo("name")
.with(new ComputeJobsProfile());

updatedPersonTwo.print();

On Wed, Jun 3, 2015 at 3:45 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi, 

I am trying to write two coGrouprs in sequence on the same ETL .. In use common dataset in both of them, in the first coGroup I update the initial dataset and retrieve the result in a new dataset object. Then I use the result in the second coGroup with another new dataset. 

While debugging, I could see the coGroup.next is false , however, in the next iteration it has elements. I tried to force enabling ObjectReuse, I got half of the expected result. I have attached a screenshot for the debugger. 

My question is, does this has a relation about the concurrent execution of different tasks in Flink. And how to solve this problem ??

Regards.


--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87





--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87