I'm impressed with the Flink API, it seems simpler and more composable than what I've seen elsewhere.
I'm trying to see how to achieve a more interactive, REPL-driven experience, similar to Spark. I'm consuming Flink from Clojure. For now I'm only interested in smaller clusters & interactive usage, so the failure recovery aspect of RDDs is less important relative to simply caching intermediate results (in this context, keeping the ResultPartitions partitions around even when theres no consuming tasks) , and dynamically extending the jobgraph. Three questions: 1) How can I prevent ResultPartitions from being released? In interactive use, RPs should not necessarily be released when there are no pending tasks to consume them. Looking at the code, its hard to understand the high-level logic of who triggers their release, how the refcounting works, etc. For instance, is releasePartitionsProducedBy called by the producer, or the consumer, or both? Is the refcount initialized at the beginning of the task setup, and decremented every time its read out? Ideally, I could force certain ResultPartitions to only be manually releasable, so I can consume them over and over. I read that Flink does not support changing the running the topology. But what about extending the topology to add more nodes? If the IntermediateResultPartition are just sitting around from previously completely tasks, this seems straightforward in principle.. would one have to fiddle with the scheduler/event system to kick things off again? 3) Can I have a ConnectionManager without a TaskManager? For interactive use, I want to selectively pull data from ResultPartitions into my local REPL process. But I don't want my local process to be a TaskManager node, and have tasks assigned to it. So this question boils down to, how to hook a ConnectionManager into the Flink communication/actor system? Thanks! |
Hi,
these are certainly valid use cases. As far is I know, the people who know most in this area are on vacation right now. They should be back in a week, I think. They should be able to give you a proper description of the current situation and some pointers. Cheers, Aljoscha > On 04 Jan 2016, at 22:47, kovas boguta <[hidden email]> wrote: > > I'm impressed with the Flink API, it seems simpler and more composable than what I've seen elsewhere. > > I'm trying to see how to achieve a more interactive, REPL-driven experience, similar to Spark. I'm consuming Flink from Clojure. > > For now I'm only interested in smaller clusters & interactive usage, so the failure recovery aspect of RDDs is less important relative to simply caching intermediate results (in this context, keeping the ResultPartitions partitions around even when theres no consuming tasks) , and dynamically extending the jobgraph. > > Three questions: > > 1) How can I prevent ResultPartitions from being released? > > In interactive use, RPs should not necessarily be released when there are no pending tasks to consume them. > > Looking at the code, its hard to understand the high-level logic of who triggers their release, how the refcounting works, etc. For instance, is releasePartitionsProducedBy called by the producer, or the consumer, or both? Is the refcount initialized at the beginning of the task setup, and decremented every time its read out? > > Ideally, I could force certain ResultPartitions to only be manually releasable, so I can consume them over and over. > > > 2) Can I call attachJobGraph on the ExecutionGraph after the job has begun executing to add more nodes? > > I read that Flink does not support changing the running the topology. But what about extending the topology to add more nodes? > > If the IntermediateResultPartition are just sitting around from previously completely tasks, this seems straightforward in principle.. would one have to fiddle with the scheduler/event system to kick things off again? > > 3) Can I have a ConnectionManager without a TaskManager? > > For interactive use, I want to selectively pull data from ResultPartitions into my local REPL process. But I don't want my local process to be a TaskManager node, and have tasks assigned to it. > > So this question boils down to, how to hook a ConnectionManager into the Flink communication/actor system? > > Thanks! > > > > > > > > > > > > > |
Reposting in hopes relevant people have returned from vacation: I'm impressed with the Flink API, it seems simpler and more composable than what I've seen elsewhere. I'm trying to see how to achieve a more interactive, REPL-driven experience, similar to Spark. I'm consuming Flinkfrom Clojure. For now I'm only interested in smaller clusters & interactive usage, so the failure recovery aspect of RDDs is less important relative to simply caching intermediate results (in this context, keeping the ResultPartitions partitions around even when theres no consuming tasks) , and dynamically extending the jobgraph. Three questions: 1) How can I prevent ResultPartitions from being released? In interactive use, RPs should not necessarily be released when there are no pending tasks to consume them. Looking at the code, its hard to understand the high-level logic of who triggers their release, how the refcounting works, etc. For instance, is releasePartitionsProducedBy called by the producer, or the consumer, or both? Is the refcount initialized at the beginning of the task setup, and decremented every time its read out? Ideally, I could force certain ResultPartitions to only be manually releasable, so I can consume them over and over. I read that Flink does not support changing the running the topology. But what about extending the topology to add more nodes? If the IntermediateResultPartition are just sitting around from previously completely tasks, this seems straightforward in principle.. would one have to fiddle with the scheduler/event system to kick things off again? 3) Can I have a ConnectionManager without a TaskManager? For interactive use, I want to selectively pull data from ResultPartitions into my local REPL process. But I don't want my local process to be a TaskManager node, and have tasks assigned to it. So this question boils down to, how to hook a ConnectionManager into the Flink communication/actor system? Thanks! On Tue, Jan 5, 2016 at 5:54 AM, Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hey Kovas
sorry for the long delay. > On 10 Jan 2016, at 06:20, kovas boguta <[hidden email]> wrote: > 1) How can I prevent ResultPartitions from being released? > > In interactive use, RPs should not necessarily be released when there are no pending tasks to consume them. Max Michels did some work along those lines in a by now very out dated pull request: https://github.com/apache/flink/pull/640 > Looking at the code, its hard to understand the high-level logic of who triggers their release, how the refcounting works, etc. For instance, is releasePartitionsProducedBy called by the producer, or the consumer, or both? Is the refcount initialized at the beginning of the task setup, and decremented every time its read out? I fully agree that the respective part of the system is very much under documented. Sorry about that. - ResultPartitionManager: the result partition manager keeps track of all partitions of a task manager. Each task registers the produced partitions when it is instantiated (see Task constructor and NetworkEnvironment#registerTask). This is the final truth about which partitions are available etc. - releasePartitionsProducedBy: This is not part of the regular life cycle of the results, but only triggered by the job manager to get rid of old results in case of cancellation/failure. - Ref counts: The release of the results during normal operation happens via the ref counts. Currently, the ref counts are always initialised to the number of sub partitions (A result partition consists of 1 or more sub partitions for each parallel receiver of the result). Decrementing happens when a sub partition has been fully consumed (via ResultPartition#onConsumedSubpartition). And as soon as the ref count reaches 0, they are released. The final release happens in the ResultPartitionManager. The behaviour that would need to change is the last step in the result partition manager imo. I think #640 has some modifications in that respect, which might be helpful in figuring out the details. I think what can happen instead of the final release is that a result becomes “cached” and stay around. > Ideally, I could force certain ResultPartitions to only be manually releasable, so I can consume them over and over. How would you like to have this controlled by the user? Offer a API operation like `.pin()`? Do you need it pinned permanently until you release it or would it be ok to just cache it and maybe recompute if another task needs more memory/disk space? > 2) Can I call attachJobGraph on the ExecutionGraph after the job has begun executing to add more nodes? > > I read that Flink does not support changing the running the topology. But what about extending the topology to add more nodes? I think that should be possible. The problem at the moment is that this will recompute everything from the sources. Your suggestion makes sense and actually this was one of the motivations for #640. The newly attached nodes would back track to the produced results of the initial topology and go on from there. > If the IntermediateResultPartition are just sitting around from previously completely tasks, this seems straightforward in principle.. would one have to fiddle with the scheduler/event system to kick things off again? I think it will be possible to just submit the new parts. But I’m not sure about the details. > 3) Can I have a ConnectionManager without a TaskManager? > > For interactive use, I want to selectively pull data from ResultPartitions into my local REPL process. But I don't want my local process to be a TaskManager node, and have tasks assigned to it. > > So this question boils down to, how to hook a ConnectionManager into the Flink communication/actor system? In theory it should be possible, yes. In practice I see problems with setting up all the parameters. You have to instantiate a NettyConnectionManager. For the start method, only the NetworkBufferPool is relevant, which is easy to instantiate as well. This will take part of the network transfers. To get the data you need a SingleInputGate and set it up with RemoteChannel instances for each consumed subpartition. This is where you need to know all the partition IDs and task managers (The connection ID is a wrapper around InetSocketAddress with a connection index for connection sharing). When you have the gate setup, you can query it with the RecordReader. The input gate itself just returns byte buffers. Do you have an idea about how you want to figure out the partition and connection IDs in your repl process? If yes, I could provide some more concrete code snippets on the setup. --- If would like to contribute to Flink, we can also think about splitting this up into some smaller tasks and address them. Your ideas are definitely in line with what we wanted to have in Flink anyways. The recent focus on the streaming part of the system has pulled most contributors away from the batch parts though. I would suggest to also look at the changes in #640. Although the PR is rather old, the network stack has not seen many changes in recent times. Feel free to post further questions! :) – Ufuk |
On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi <[hidden email]> wrote: Hey Kovas It was worth the wait! Thanks for the detailed response. > Ideally, I could force certain ResultPartitions to only be manually releasable, so I can consume them over and over. If another task needs resources, I would expect cached data to be moved to lower levels of the storage hierarchy. We don't want to block/degrade new computations, because it is impossible to tell the user upfront that their new request will exceed the available resources. I think extending the approach of Tachyon might be interesting. Currently, they have up to 3 tiers: memory, local disk, durable "understore" (hdfs, s3, etc). One could add a fourth possibility, call it "null" which would mean, just drop data and recompute if needed. Then, one could specify a policy with a list like [memory, null] or [memory, local disk, null] and have pretty reasonable control. However its done, I hope its in the nice compositional, interface-driven style that Flink enjoys today. "Simple" is much better than "easy" for me. There's a bunch of questions, and having well-though-out architecture to achieve different behaviors seems like the most valuable thing to me vs any specific top-level API approach. One issue/question: How would the user refer to an existing dataset in subsequent operations? When you create a dataset at the top-level API, is it automatically assigned an ID? The separation of logical & physical plan is a Good Thing. When one is submitting additional plans, one needs to refer to previous nodes. Whether nodes are assigned identity by value versus by reference makes a difference.
This part is fairly well described in the docs, and I was able to more or less figure it out. Cool stuff. Do you have an idea about how you want to figure out the partition and connection IDs in your repl process? If yes, I could provide some more concrete code snippets on the setup. For hacking purposes my plan was to manually instantiate the execution graph and instrument some of the nodes so I can get information out. My REPL would run in the same process as the JobManager. For a "real" solution, the REPL needs seem related to the WebUI, which I haven't studied yet. One would want a fairly detailed view into the running execution graph, possibly but not necessarily as an HTTP api. I haven't studied how the execution graph is instantiated yet, again I'd rather inject this logic via composition than have it hard-coded into the existing implementation. Will have to study more. Thanks for the pointers!
|
On Thu, Jan 14, 2016 at 4:00 PM, kovas boguta <[hidden email]> wrote:
Is there a way to hook into the global or 'per-job' event stream? One approach could be to allow the user to submit a listener on the event stream, which presumably contains all the relevant information about availability and location of result partitions (and other stuff of interest). Writing a little service to send that info down to a client is pretty easy from there. |
In reply to this post by kovas boguta
> On 14 Jan 2016, at 22:00, kovas boguta <[hidden email]> wrote: > > On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi <[hidden email]> wrote: > Hey Kovas > > sorry for the long delay. > > It was worth the wait! Thanks for the detailed response. > > > Ideally, I could force certain ResultPartitions to only be manually releasable, so I can consume them over and over. > > How would you like to have this controlled by the user? Offer a API operation like `.pin()`? Do you need it pinned permanently until you release it or would it be ok to just cache it and maybe recompute if another task needs more memory/disk space? > > If another task needs resources, I would expect cached data to be moved to lower levels of the storage hierarchy. We don't want to block/degrade new computations, because it is impossible to tell the user upfront that their new request will exceed the available resources. > > I think extending the approach of Tachyon might be interesting. Currently, they have up to 3 tiers: memory, local disk, durable "understore" (hdfs, s3, etc). One could add a fourth possibility, call it "null" which would mean, just drop data and recompute if needed. Then, one could specify a policy with a list like [memory, null] or [memory, local disk, null] and have pretty reasonable control. This is in line with the plans the community has for the intermediate results in the future. :) > However its done, I hope its in the nice compositional, interface-driven style that Flink enjoys today. "Simple" is much better than "easy" for me. There's a bunch of questions, and having well-though-out architecture to achieve different behaviors seems like the most valuable thing to me vs any specific top-level API approach. > > One issue/question: > > How would the user refer to an existing dataset in subsequent operations? When you create a dataset at the top-level API, is it automatically assigned an ID? All IDs and configuration is assigned in the JobGraph structure. It is a non-parallel representation of the data flow (of the execution graph). > The separation of logical & physical plan is a Good Thing. When one is submitting additional plans, one needs to refer to previous nodes. Whether nodes are assigned identity by value versus by reference makes a difference. The new graph simply refers to the logical ID and the job manager attaches it to the old graph. The connection happens by specifying the ID of a previous result as input to a job vertex. > To get the data you need a SingleInputGate and set it up with RemoteChannel instances for each consumed subpartition. This is where you need to know all the partition IDs and task managers (The connection ID is a wrapper around InetSocketAddress with a connection index for connection sharing). > > When you have the gate setup, you can query it with the RecordReader. The input gate itself just returns byte buffers. > > This part is fairly well described in the docs, and I was able to more or less figure it out. Cool stuff. In general, for which parts would you like to see more documentation? > Do you have an idea about how you want to figure out the partition and connection IDs in your repl process? If yes, I could provide some more concrete code snippets on the setup. > > For hacking purposes my plan was to manually instantiate the execution graph and instrument some of the nodes so I can get information out. My REPL would run in the same process as the JobManager. > > For a "real" solution, the REPL needs seem related to the WebUI, which I haven't studied yet. One would want a fairly detailed view into the running execution graph, possibly but not necessarily as an HTTP api. > > I haven't studied how the execution graph is instantiated yet, again I'd rather inject this logic via composition than have it hard-coded into the existing implementation. Will have to study more. OK. In any case, feel free to post more questions here. – Ufuk |
Free forum by Nabble | Edit this page |