I was wondering about the status of the flink stop command. At first blush it would seem as the preferable way to shutdown a Flink job, but it depends on StoppableFunction being implemented by sources and I notice that the Kafka source does not seem to implement it. In addition, the command does not -s --withSavepoint like cancel does.
Is stop deprecated? |
Anyone? On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]> wrote:
|
Hey Elias,
sorry for the delay here. No, stop is not deprecated but not fully implemented yet. One missing part is migration of the existing source functions as you say. Let me pull in Till for more details on this. @Till: Is there more missing than migrating the sources? Here is the PR and discussion for reference: https://github.com/apache/flink/pull/750 I would also really love to see this fully implemented in Flink. I don't expect this to happen for the upcoming 1.4 release though. – Ufuk On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <[hidden email]> wrote: > Anyone? > > On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]> > wrote: >> >> I was wondering about the status of the flink stop command. At first >> blush it would seem as the preferable way to shutdown a Flink job, but it >> depends on StoppableFunction being implemented by sources and I notice that >> the Kafka source does not seem to implement it. In addition, the command >> does not -s --withSavepoint like cancel does. >> >> Is stop deprecated? > > |
I too am curious about stop vs cancel. I'm trying to understand the motivations a bit more. The current behavior of stop is basically that the sources become bounded, leading to the job winding down. The interesting question is how best to support 'planned' maintenance procedures such as app upgrade and scale changes. I think a good enhancement could be to stop precisely at checkpoint time to prevent emission of spurious records. Today the behavior of 'cancel w/ savepoint' is at-least-once because the two operations aren't atomic. Earlier I had assumed that 'stop' would evolve in this direction but I suppose we could improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'. A different direction for 'stop' might be to improve the determinism of bounding a streaming job such that the stop point is well-understood in terms of the source. For example, stopping at a offset provided as a stop parameter. Today I suppose one would rely on external state to remember the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets. On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <[hidden email]> wrote: Hey Elias, |
Also relevant for this discussion: Several people (including me) by now were floating the idea of reworking the source interface to take away the responsibility of stopping/canceling/continuing from a specific source implementation and to instead give that power to the system. Currently each source does basically this:
class Source<T> { public void run(Context ctx, Lock lock) { while ("forever long I want and I don't care") { synchronized (lock) { T output = ReadFrom.externalSystem(); updateReadPositionState(); ctx.collect(output); } } } } Meaning that any stopping/canceling behaviour requires cooperation from the source implementation. This would be a different idea for a source interface: abstract class NewSource { public abstract boolean start(); public abstract boolean advance(); public abstract void close(); public abstract T getCurrent(); public abstract Instant getCurrentTimestamp(); public abstract Instant getWatermark(); public abstract CheckpointMark getCheckpointMark(); } Here the driver would sit outside and call the source whenever data should be provided. Stop/cancel would not be a feature of the source function but of the code that calls it. Best, Aljoscha
|
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'? It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation. e.g. the source might require a buffer pool. On Fri, Sep 15, 2017 at 9:05 AM, Aljoscha Krettek <[hidden email]> wrote:
|
On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <[hidden email]> wrote:
The Kafka client works that way. As does the QueueingConsumer used by the RabbitMQ source. The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources. |
@Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.
|
I would propose implementations of NewSource to be not blocking/asynchronous. For example something like public abstract Future<T> getCurrent(); Which would allow us to perform some certain actions while there are no data available to process (for example flush output buffers). Something like this came up recently when we were discussing possible future changes in the network stack. It wouldn’t complicate API by a lot, since default implementation could just: public Future<T> getCurrent() { return completedFuture(getCurrentBlocking()); } Another thing to consider is maybe we would like to leave the door open for fetching records in some batches from the source’s input buffers? Source function (like Kafka) have some internal buffers and it would be more efficient to read all/deserialise all data present in the input buffer at once, instead of paying synchronisation/calling virtual method/etc costs once per each record. Piotrek
|
I am re-upping this thread now that FlinkKafkaProducer011 is out. The new producer, when used with the exactly once semantics, has the rather troublesome behavior that it will fallback to at-most-once, rather than at-least-once, if the job is down for longer than the Kafka broker's transaction.max.timeout.ms setting. In situations that require extended maintenance downtime, this behavior is nearly certain to lead to message loss, as a canceling a job while taking a savepoint will not wait for the Kafka transactions to bet committed and is not atomic. So it seems like there is a need for an atomic stop or cancel with savepoint that waits for transactional sinks to commit and then immediately stop any further message processing. On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <[hidden email]> wrote:
|
Hi,
Yes we are aware of this issue and we would like to have it soon, but at the moment it does not look like clean shutdown will be ready for Flink 1.5. Another solution is Kafka exactly-once producer implemented on top of the GenericWriteAheadSink. It could avoid this issue (at a cost of significantly higher overhead). There are plans to implement such producer as an alternative to the current one, but I do not know the timeline for that. It should be relatively easy task and we would welcome such contribution. Piotrek
|
Free forum by Nabble | Edit this page |