Hi,
I am writing a flink job, in which I have three datasets. I have partitionedByHash the first two before coGrouping them. My plan is to spill the result of coGrouping to disk, and then re-read it again before coGrouping with the third dataset. My question is, is there anyway to inform flink that the first coGroup result is already partitioned ?! I know I can re-partition again before coGrouping but I would like to know if there is anyway to avoid a step which was already executed, Regards. |
Hi Mustafa, I'm afraid, this is not possible. Although you can annotate DataSources with partitioning information, this is not enough to avoid repartitioning for a CoGroup. The reason for that is that CoGroup requires co-partitioning of both inputs, i.e., both inputs must be equally partitioned (same number of partitions, same partitioning function, same location of partitions). Since Flink is dynamically assigning tasks to execution slots, it is not possible to co-locate data that was read from a data source and data coming from the result of another computation. 2015-05-18 15:43 GMT+02:00 Mustafa Elbehery <[hidden email]>:
|
Thanks for the feedback, Fabian. This is related to the question I sent on the user mailing list yesterday. Mustafa is working on a master thesis where we try to abstract an operator for the update of stateful datasets (decoupled from the current native iterations logic) and use it in conjunction with lazy unrolling of iterations.
If I remember correctly back in the day we had a PACT output contract that served a similar purpose avoid unnecessary shuffles), but I was not able to find it yesterday. In either case, I think even if that does not work out of the box at the moment, that most of the logic is in place (e.g. co-location groups in the scheduler), and we are willing to either hack the code or add the missing functionality in order to realize the above described goal. Suggestions are welcome! Regards, 2015-05-18 17:42 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Alright, so if both inputs of the CoGroup are read from the file system, there should be a way to do the co-group on co-located data without repartitioning. In fact, I have some code lying around to do co-located joins from local FS [1]. Haven't tested it thoroughly and it also relies on a number of assumptions. If the data is also sorted you can even get around sorting it if you inject a few lines into the optimizer (see change for FLINK-1444) and ensure that each source reads exactly one! input split. - Unique key: Keys from UDF or source are unique Let me know, if you have questions. Cheers, Fabian 2015-05-19 13:49 GMT+02:00 Alexander Alexandrov <[hidden email]>:
|
Hi Folks, I am reviving this thread again, as I am stuck in one step to achieve my target. the following code is doing partitioning, before coGrouping, then writing to disk. I am trying to re-read the data from disk, so I have createLocatableInputSPlits [] with the size of DOP. Find the code Below inPerson.partitionByHash("name") TrackHost is an Accumulator to track the host information, && MutuableInputFormat, is an customInputFormat which extends TextInputFormat && implements StrictlyLocalAssignment .. I am using LocatableInputSplit as a instanceField, as implementing InputSplit is conflicting with TextInputFormat, on the createInputSplit method, they both have the same method and the compiler complained for that.
Again, while debugging I could see the problem in ExectionJobVertex line 146 . the execution ignores the Locatables I am shipping with my splits, and re-create inputSplits again which get the hostInfo(Machine Name) from the execution somehow, while the taskManagers prepared by the scheduler waiting for a machine with "LocalHost". Any Suggestion ?? Regards. On Tue, May 19, 2015 at 2:16 PM, Fabian Hueske <[hidden email]> wrote:
|
I think we are still talking about the same issue as in a related question. I suspect that the MutableInputFormatTest does not properly return the splits in the "createInputSplits()" function. To validate that, you can write yourself a unit test that checks whether the input format returns your splits from the method "createInputSplits()". On Fri, May 29, 2015 at 5:59 PM, Mustafa Elbehery <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |