Informing the runtime about data already repartitioned using "output contracts"

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Informing the runtime about data already repartitioned using "output contracts"

Mustafa Elbehery
Hi, 

I am writing a flink job, in which I have three datasets.  I have partitionedByHash the first two before coGrouping them. 

My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. 

My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?!  I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, 

Regards.

--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87

Reply | Threaded
Open this post in threaded view
|

Re: Informing the runtime about data already repartitioned using "output contracts"

Fabian Hueske-2
Hi Mustafa,

I'm afraid, this is not possible.
Although you can annotate DataSources with partitioning information, this is not enough to avoid repartitioning for a CoGroup. The reason for that is that CoGroup requires co-partitioning of both inputs, i.e., both inputs must be equally partitioned (same number of partitions, same partitioning function, same location of partitions). Since Flink is dynamically assigning tasks to execution slots, it is not possible to co-locate data that was read from a data source and data coming from the result of another computation.

If you just need the result of the first co-group on disk, you could also build a single program that does both co-groups and additional writes the result of the first co-group to disk (Flink supports multiple data sinks).

Best, Fabian

2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi, 

I am writing a flink job, in which I have three datasets.  I have partitionedByHash the first two before coGrouping them. 

My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. 

My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?!  I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, 

Regards.

--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87


Reply | Threaded
Open this post in threaded view
|

Re: Informing the runtime about data already repartitioned using "output contracts"

Alexander Alexandrov
Thanks for the feedback, Fabian.

This is related to the question I sent on the user mailing list yesterday. Mustafa is working on a master thesis where we try to abstract an operator for the update of stateful datasets (decoupled from the current native iterations logic) and use it in conjunction with lazy unrolling of iterations.

The assumptions are as follows:
  • Each iteration runs a job with the same structure and the same DOP;
  • Updates a realized through a coGroup with a fixed DOP (let's say N), which consumes a (state, updates) pair of datasets and produces a new version of the state (let's call it state');
  • We keep track where the N output partitions of state' are located and use this information for local placement of the corresponding N DataSource tasks in the next iteration (via FLINK-1478);
  • The remaining piece of the puzzle is to figure out how to tell the coGroup that one of the inputs is already partitioned so id avoids an unnecessary shuffle;

If I remember correctly back in the day we had a PACT output contract that served a similar purpose avoid unnecessary shuffles), but I was not able to find it yesterday.

In either case, I think even if that does not work out of the box at the moment, that most of the logic is in place (e.g. co-location groups in the scheduler), and we are willing to either hack the code or add the missing functionality in order to realize the above described goal.

Suggestions are welcome!

Regards,
Alex





2015-05-18 17:42 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Mustafa,

I'm afraid, this is not possible.
Although you can annotate DataSources with partitioning information, this is not enough to avoid repartitioning for a CoGroup. The reason for that is that CoGroup requires co-partitioning of both inputs, i.e., both inputs must be equally partitioned (same number of partitions, same partitioning function, same location of partitions). Since Flink is dynamically assigning tasks to execution slots, it is not possible to co-locate data that was read from a data source and data coming from the result of another computation.

If you just need the result of the first co-group on disk, you could also build a single program that does both co-groups and additional writes the result of the first co-group to disk (Flink supports multiple data sinks).

Best, Fabian

2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi, 

I am writing a flink job, in which I have three datasets.  I have partitionedByHash the first two before coGrouping them. 

My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. 

My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?!  I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, 

Regards.

--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87



Reply | Threaded
Open this post in threaded view
|

Re: Informing the runtime about data already repartitioned using "output contracts"

Fabian Hueske-2
Alright, so if both inputs of the CoGroup are read from the file system, there should be a way to do the co-group on co-located data without repartitioning.
In fact, I have some code lying around to do co-located joins from local FS [1]. Haven't tested it thoroughly and it also relies on a number of assumptions. If the data is also sorted you can even get around sorting it if you inject a few lines into the optimizer (see change for FLINK-1444) and ensure that each source reads exactly one! input split.

Regarding your question about the PACT output contracts, there were three types which were defined wrt to a Key/Value pair data model:
- Same key: UDF does not modify the key
- Super key: UDF extends the key (Partitioning remains valid, sorting not)
- Unique key: Keys from UDF or source are unique

Let me know, if you have questions.
Cheers, Fabian

2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <[hidden email]>:
Thanks for the feedback, Fabian.

This is related to the question I sent on the user mailing list yesterday. Mustafa is working on a master thesis where we try to abstract an operator for the update of stateful datasets (decoupled from the current native iterations logic) and use it in conjunction with lazy unrolling of iterations.

The assumptions are as follows:
  • Each iteration runs a job with the same structure and the same DOP;
  • Updates a realized through a coGroup with a fixed DOP (let's say N), which consumes a (state, updates) pair of datasets and produces a new version of the state (let's call it state');
  • We keep track where the N output partitions of state' are located and use this information for local placement of the corresponding N DataSource tasks in the next iteration (via FLINK-1478);
  • The remaining piece of the puzzle is to figure out how to tell the coGroup that one of the inputs is already partitioned so id avoids an unnecessary shuffle;

If I remember correctly back in the day we had a PACT output contract that served a similar purpose avoid unnecessary shuffles), but I was not able to find it yesterday.

In either case, I think even if that does not work out of the box at the moment, that most of the logic is in place (e.g. co-location groups in the scheduler), and we are willing to either hack the code or add the missing functionality in order to realize the above described goal.

Suggestions are welcome!

Regards,
Alex





2015-05-18 17:42 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Mustafa,

I'm afraid, this is not possible.
Although you can annotate DataSources with partitioning information, this is not enough to avoid repartitioning for a CoGroup. The reason for that is that CoGroup requires co-partitioning of both inputs, i.e., both inputs must be equally partitioned (same number of partitions, same partitioning function, same location of partitions). Since Flink is dynamically assigning tasks to execution slots, it is not possible to co-locate data that was read from a data source and data coming from the result of another computation.

If you just need the result of the first co-group on disk, you could also build a single program that does both co-groups and additional writes the result of the first co-group to disk (Flink supports multiple data sinks).

Best, Fabian

2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi, 

I am writing a flink job, in which I have three datasets.  I have partitionedByHash the first two before coGrouping them. 

My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. 

My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?!  I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, 

Regards.

--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87




Reply | Threaded
Open this post in threaded view
|

Re: Informing the runtime about data already repartitioned using "output contracts"

Mustafa Elbehery
Hi Folks, 

I am reviving this thread again, as I am stuck in one step to achieve my target. 

the following code is doing partitioning, before coGrouping, then writing to disk.  I am trying to re-read the data from disk, so I have createLocatableInputSPlits [] with the size of DOP. Find the code Below

inPerson.partitionByHash("name")
.map(new TrackHost())
.coGroup(inStudent.partitionByHash("name"))
.where("name").equalTo("name")
.with(new ComputeStudiesProfile())
.write(new TextOutputFormat(new Path()), "file:///home/mustafa/Documents/tst/", FileSystem.WriteMode.OVERWRITE);

LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()];
splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost");
DataSet<Person> secondIn = env.createInput(new MutableInputFormatTest(new Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new PersonMapper());
secondIn.print();


TrackHost is an Accumulator to track the host information, && MutuableInputFormat, is an customInputFormat which extends TextInputFormat && implements StrictlyLocalAssignment .. 
I am using LocatableInputSplit as a instanceField, as implementing InputSplit is conflicting with TextInputFormat, on the createInputSplit method, they both have the same method and the compiler complained for that. 

Again, while debugging I could see the problem in ExectionJobVertex line 146 . the execution ignores the Locatables I am shipping with my splits, and re-create inputSplits again which get the hostInfo(Machine Name) from the execution somehow, while the taskManagers prepared by the scheduler waiting for a machine with "LocalHost".
Any Suggestion ??
Regards.



On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <[hidden email]> wrote:
Alright, so if both inputs of the CoGroup are read from the file system, there should be a way to do the co-group on co-located data without repartitioning.
In fact, I have some code lying around to do co-located joins from local FS [1]. Haven't tested it thoroughly and it also relies on a number of assumptions. If the data is also sorted you can even get around sorting it if you inject a few lines into the optimizer (see change for FLINK-1444) and ensure that each source reads exactly one! input split.

Regarding your question about the PACT output contracts, there were three types which were defined wrt to a Key/Value pair data model:
- Same key: UDF does not modify the key
- Super key: UDF extends the key (Partitioning remains valid, sorting not)
- Unique key: Keys from UDF or source are unique

Let me know, if you have questions.
Cheers, Fabian

2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <[hidden email]>:
Thanks for the feedback, Fabian.

This is related to the question I sent on the user mailing list yesterday. Mustafa is working on a master thesis where we try to abstract an operator for the update of stateful datasets (decoupled from the current native iterations logic) and use it in conjunction with lazy unrolling of iterations.

The assumptions are as follows:
  • Each iteration runs a job with the same structure and the same DOP;
  • Updates a realized through a coGroup with a fixed DOP (let's say N), which consumes a (state, updates) pair of datasets and produces a new version of the state (let's call it state');
  • We keep track where the N output partitions of state' are located and use this information for local placement of the corresponding N DataSource tasks in the next iteration (via FLINK-1478);
  • The remaining piece of the puzzle is to figure out how to tell the coGroup that one of the inputs is already partitioned so id avoids an unnecessary shuffle;

If I remember correctly back in the day we had a PACT output contract that served a similar purpose avoid unnecessary shuffles), but I was not able to find it yesterday.

In either case, I think even if that does not work out of the box at the moment, that most of the logic is in place (e.g. co-location groups in the scheduler), and we are willing to either hack the code or add the missing functionality in order to realize the above described goal.

Suggestions are welcome!

Regards,
Alex





2015-05-18 17:42 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Mustafa,

I'm afraid, this is not possible.
Although you can annotate DataSources with partitioning information, this is not enough to avoid repartitioning for a CoGroup. The reason for that is that CoGroup requires co-partitioning of both inputs, i.e., both inputs must be equally partitioned (same number of partitions, same partitioning function, same location of partitions). Since Flink is dynamically assigning tasks to execution slots, it is not possible to co-locate data that was read from a data source and data coming from the result of another computation.

If you just need the result of the first co-group on disk, you could also build a single program that does both co-groups and additional writes the result of the first co-group to disk (Flink supports multiple data sinks).

Best, Fabian

2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi, 

I am writing a flink job, in which I have three datasets.  I have partitionedByHash the first two before coGrouping them. 

My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. 

My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?!  I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, 

Regards.

--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87







--
Mustafa Elbehery
+49(0)15750363097
skype: mustafaelbehery87

Reply | Threaded
Open this post in threaded view
|

Re: Informing the runtime about data already repartitioned using "output contracts"

Stephan Ewen
I think we are still talking about the same issue as in a related question. I suspect that the MutableInputFormatTest does not properly return the splits in the "createInputSplits()" function.

To validate that, you can write yourself a unit test that checks whether the input format returns your splits from the method "createInputSplits()".

On Fri, May 29, 2015 at 5:59 PM, Mustafa Elbehery <[hidden email]> wrote:
Hi Folks, 

I am reviving this thread again, as I am stuck in one step to achieve my target. 

the following code is doing partitioning, before coGrouping, then writing to disk.  I am trying to re-read the data from disk, so I have createLocatableInputSPlits [] with the size of DOP. Find the code Below

inPerson.partitionByHash("name")
.map(new TrackHost())
.coGroup(inStudent.partitionByHash("name"))
.where("name").equalTo("name")
.with(new ComputeStudiesProfile())
.write(new TextOutputFormat(new Path()), "file:///home/mustafa/Documents/tst/", FileSystem.WriteMode.OVERWRITE);

LocatableInputSplit [] splits = new LocatableInputSplit[env.getParallelism()];
splits[0] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[1] = new LocatableInputSplit(env.getParallelism(),"localhost");
splits[2] = new LocatableInputSplit(env.getParallelism(),"localhost");
DataSet<Person> secondIn = env.createInput(new MutableInputFormatTest(new Path("file:///home/mustafa/Documents/tst/1"),splits)).map(new PersonMapper());
secondIn.print();


TrackHost is an Accumulator to track the host information, && MutuableInputFormat, is an customInputFormat which extends TextInputFormat && implements StrictlyLocalAssignment .. 
I am using LocatableInputSplit as a instanceField, as implementing InputSplit is conflicting with TextInputFormat, on the createInputSplit method, they both have the same method and the compiler complained for that. 

Again, while debugging I could see the problem in ExectionJobVertex line 146 . the execution ignores the Locatables I am shipping with my splits, and re-create inputSplits again which get the hostInfo(Machine Name) from the execution somehow, while the taskManagers prepared by the scheduler waiting for a machine with "LocalHost".
Any Suggestion ??
Regards.



On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <[hidden email]> wrote:
Alright, so if both inputs of the CoGroup are read from the file system, there should be a way to do the co-group on co-located data without repartitioning.
In fact, I have some code lying around to do co-located joins from local FS [1]. Haven't tested it thoroughly and it also relies on a number of assumptions. If the data is also sorted you can even get around sorting it if you inject a few lines into the optimizer (see change for FLINK-1444) and ensure that each source reads exactly one! input split.

Regarding your question about the PACT output contracts, there were three types which were defined wrt to a Key/Value pair data model:
- Same key: UDF does not modify the key
- Super key: UDF extends the key (Partitioning remains valid, sorting not)
- Unique key: Keys from UDF or source are unique

Let me know, if you have questions.
Cheers, Fabian

2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <[hidden email]>:
Thanks for the feedback, Fabian.

This is related to the question I sent on the user mailing list yesterday. Mustafa is working on a master thesis where we try to abstract an operator for the update of stateful datasets (decoupled from the current native iterations logic) and use it in conjunction with lazy unrolling of iterations.

The assumptions are as follows:
  • Each iteration runs a job with the same structure and the same DOP;
  • Updates a realized through a coGroup with a fixed DOP (let's say N), which consumes a (state, updates) pair of datasets and produces a new version of the state (let's call it state');
  • We keep track where the N output partitions of state' are located and use this information for local placement of the corresponding N DataSource tasks in the next iteration (via FLINK-1478);
  • The remaining piece of the puzzle is to figure out how to tell the coGroup that one of the inputs is already partitioned so id avoids an unnecessary shuffle;

If I remember correctly back in the day we had a PACT output contract that served a similar purpose avoid unnecessary shuffles), but I was not able to find it yesterday.

In either case, I think even if that does not work out of the box at the moment, that most of the logic is in place (e.g. co-location groups in the scheduler), and we are willing to either hack the code or add the missing functionality in order to realize the above described goal.

Suggestions are welcome!

Regards,
Alex





2015-05-18 17:42 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Mustafa,

I'm afraid, this is not possible.
Although you can annotate DataSources with partitioning information, this is not enough to avoid repartitioning for a CoGroup. The reason for that is that CoGroup requires co-partitioning of both inputs, i.e., both inputs must be equally partitioned (same number of partitions, same partitioning function, same location of partitions). Since Flink is dynamically assigning tasks to execution slots, it is not possible to co-locate data that was read from a data source and data coming from the result of another computation.

If you just need the result of the first co-group on disk, you could also build a single program that does both co-groups and additional writes the result of the first co-group to disk (Flink supports multiple data sinks).

Best, Fabian

2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[hidden email]>:
Hi, 

I am writing a flink job, in which I have three datasets.  I have partitionedByHash the first two before coGrouping them. 

My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. 

My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?!  I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, 

Regards.

--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87







--
Mustafa Elbehery
<a href="tel:%2B49%280%2915750363097" value="+4915750363097" target="_blank">+49(0)15750363097
skype: mustafaelbehery87