Flink flick cancel vs stop

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink flick cancel vs stop

Elias Levy
I was wondering about the status of the flink stop command.  At first blush it would seem as the preferable way to shutdown a Flink job, but it depends on StoppableFunction being implemented by sources and I notice that the Kafka source does not seem to implement it.  In addition, the command does not -s  --withSavepoint like cancel does.

Is stop deprecated?
Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Elias Levy
Anyone?

On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]> wrote:
I was wondering about the status of the flink stop command.  At first blush it would seem as the preferable way to shutdown a Flink job, but it depends on StoppableFunction being implemented by sources and I notice that the Kafka source does not seem to implement it.  In addition, the command does not -s  --withSavepoint like cancel does.

Is stop deprecated?

Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Ufuk Celebi
Hey Elias,

sorry for the delay here. No, stop is not deprecated but not fully
implemented yet. One missing part is migration of the existing source
functions as you say.

Let me pull in Till for more details on this. @Till: Is there more
missing than migrating the sources?

Here is the PR and discussion for reference:
https://github.com/apache/flink/pull/750

I would also really love to see this fully implemented in Flink. I
don't expect this to happen for the upcoming 1.4 release though.

– Ufuk


On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <[hidden email]> wrote:

> Anyone?
>
> On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]>
> wrote:
>>
>> I was wondering about the status of the flink stop command.  At first
>> blush it would seem as the preferable way to shutdown a Flink job, but it
>> depends on StoppableFunction being implemented by sources and I notice that
>> the Kafka source does not seem to implement it.  In addition, the command
>> does not -s  --withSavepoint like cancel does.
>>
>> Is stop deprecated?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Eron Wright
I too am curious about stop vs cancel.  I'm trying to understand the motivations a bit more.

The current behavior of stop is basically that the sources become bounded, leading to the job winding down.

The interesting question is how best to support 'planned' maintenance procedures such as app upgrade and scale changes.   I think a good enhancement could be to stop precisely at checkpoint time to prevent emission of spurious records.  Today the behavior of 'cancel w/ savepoint' is at-least-once because the two operations aren't atomic.  Earlier I had assumed that 'stop' would evolve in this direction but I suppose we could improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'.

A different direction for 'stop' might be to improve the determinism of bounding a streaming job such that the stop point is well-understood in terms of the source.  For example, stopping at a offset provided as a stop parameter.   Today I suppose one would rely on external state to remember the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.

On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Elias,

sorry for the delay here. No, stop is not deprecated but not fully
implemented yet. One missing part is migration of the existing source
functions as you say.

Let me pull in Till for more details on this. @Till: Is there more
missing than migrating the sources?

Here is the PR and discussion for reference:
https://github.com/apache/flink/pull/750

I would also really love to see this fully implemented in Flink. I
don't expect this to happen for the upcoming 1.4 release though.

– Ufuk


On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <[hidden email]> wrote:
> Anyone?
>
> On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]>
> wrote:
>>
>> I was wondering about the status of the flink stop command.  At first
>> blush it would seem as the preferable way to shutdown a Flink job, but it
>> depends on StoppableFunction being implemented by sources and I notice that
>> the Kafka source does not seem to implement it.  In addition, the command
>> does not -s  --withSavepoint like cancel does.
>>
>> Is stop deprecated?
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Aljoscha Krettek
Also relevant for this discussion: Several people (including me) by now were floating the idea of reworking the source interface to take away the responsibility of stopping/canceling/continuing from a specific source implementation and to instead give that power to the system. Currently each source does basically this:

class Source<T> {
  public void run(Context ctx, Lock lock) {
    while ("forever long I want and I don't care") {
      synchronized (lock) {
        T output = ReadFrom.externalSystem();
         updateReadPositionState();
        ctx.collect(output);
      }
    }
  }
}

Meaning that any stopping/canceling behaviour requires cooperation from the source implementation.

This would be a different idea for a source interface:

abstract class NewSource {
  public abstract boolean start();
  public abstract boolean advance();
  public abstract void close();

  public abstract T getCurrent();
  public abstract Instant getCurrentTimestamp();
  public abstract Instant getWatermark();

  public abstract CheckpointMark getCheckpointMark();
}

Here the driver would sit outside and call the source whenever data should be provided. Stop/cancel would not be a feature of the source function but of the code that calls it.

Best,
Aljoscha

On 14. Sep 2017, at 20:03, Eron Wright <[hidden email]> wrote:

I too am curious about stop vs cancel.  I'm trying to understand the motivations a bit more.

The current behavior of stop is basically that the sources become bounded, leading to the job winding down.

The interesting question is how best to support 'planned' maintenance procedures such as app upgrade and scale changes.   I think a good enhancement could be to stop precisely at checkpoint time to prevent emission of spurious records.  Today the behavior of 'cancel w/ savepoint' is at-least-once because the two operations aren't atomic.  Earlier I had assumed that 'stop' would evolve in this direction but I suppose we could improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'.

A different direction for 'stop' might be to improve the determinism of bounding a streaming job such that the stop point is well-understood in terms of the source.  For example, stopping at a offset provided as a stop parameter.   Today I suppose one would rely on external state to remember the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.

On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Elias,

sorry for the delay here. No, stop is not deprecated but not fully
implemented yet. One missing part is migration of the existing source
functions as you say.

Let me pull in Till for more details on this. @Till: Is there more
missing than migrating the sources?

Here is the PR and discussion for reference:
https://github.com/apache/flink/pull/750

I would also really love to see this fully implemented in Flink. I
don't expect this to happen for the upcoming 1.4 release though.

– Ufuk


On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <[hidden email]> wrote:
> Anyone?
>
> On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]>
> wrote:
>>
>> I was wondering about the status of the flink stop command.  At first
>> blush it would seem as the preferable way to shutdown a Flink job, but it
>> depends on StoppableFunction being implemented by sources and I notice that
>> the Kafka source does not seem to implement it.  In addition, the command
>> does not -s  --withSavepoint like cancel does.
>>
>> Is stop deprecated?
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Eron Wright
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.

On Fri, Sep 15, 2017 at 9:05 AM, Aljoscha Krettek <[hidden email]> wrote:
Also relevant for this discussion: Several people (including me) by now were floating the idea of reworking the source interface to take away the responsibility of stopping/canceling/continuing from a specific source implementation and to instead give that power to the system. Currently each source does basically this:

class Source<T> {
  public void run(Context ctx, Lock lock) {
    while ("forever long I want and I don't care") {
      synchronized (lock) {
        T output = ReadFrom.externalSystem();
         updateReadPositionState();
        ctx.collect(output);
      }
    }
  }
}

Meaning that any stopping/canceling behaviour requires cooperation from the source implementation.

This would be a different idea for a source interface:

abstract class NewSource {
  public abstract boolean start();
  public abstract boolean advance();
  public abstract void close();

  public abstract T getCurrent();
  public abstract Instant getCurrentTimestamp();
  public abstract Instant getWatermark();

  public abstract CheckpointMark getCheckpointMark();
}

Here the driver would sit outside and call the source whenever data should be provided. Stop/cancel would not be a feature of the source function but of the code that calls it.

Best,
Aljoscha

On 14. Sep 2017, at 20:03, Eron Wright <[hidden email]> wrote:

I too am curious about stop vs cancel.  I'm trying to understand the motivations a bit more.

The current behavior of stop is basically that the sources become bounded, leading to the job winding down.

The interesting question is how best to support 'planned' maintenance procedures such as app upgrade and scale changes.   I think a good enhancement could be to stop precisely at checkpoint time to prevent emission of spurious records.  Today the behavior of 'cancel w/ savepoint' is at-least-once because the two operations aren't atomic.  Earlier I had assumed that 'stop' would evolve in this direction but I suppose we could improve the atomicity of 'cancel /w savepoint' rather than implicating 'stop'.

A different direction for 'stop' might be to improve the determinism of bounding a streaming job such that the stop point is well-understood in terms of the source.  For example, stopping at a offset provided as a stop parameter.   Today I suppose one would rely on external state to remember the stop point, e.g. FlinkKafkaConsumer010::setStartFromGroupOffsets.

On Thu, Sep 14, 2017 at 1:03 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Elias,

sorry for the delay here. No, stop is not deprecated but not fully
implemented yet. One missing part is migration of the existing source
functions as you say.

Let me pull in Till for more details on this. @Till: Is there more
missing than migrating the sources?

Here is the PR and discussion for reference:
https://github.com/apache/flink/pull/750

I would also really love to see this fully implemented in Flink. I
don't expect this to happen for the upcoming 1.4 release though.

– Ufuk


On Wed, Sep 13, 2017 at 7:07 PM, Elias Levy <[hidden email]> wrote:
> Anyone?
>
> On Mon, Sep 11, 2017 at 6:17 PM, Elias Levy <[hidden email]>
> wrote:
>>
>> I was wondering about the status of the flink stop command.  At first
>> blush it would seem as the preferable way to shutdown a Flink job, but it
>> depends on StoppableFunction being implemented by sources and I notice that
>> the Kafka source does not seem to implement it.  In addition, the command
>> does not -s  --withSavepoint like cancel does.
>>
>> Is stop deprecated?
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Elias Levy
On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <[hidden email]> wrote:
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.

The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.
Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Aljoscha Krettek
@Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.

On 15. Sep 2017, at 20:17, Elias Levy <[hidden email]> wrote:

On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <[hidden email]> wrote:
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.

The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.

Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Piotr Nowojski
I would propose implementations of NewSource to be not blocking/asynchronous. For example something like

public abstract Future<T> getCurrent();

Which would allow us to perform some certain actions while there are no data available to process (for example flush output buffers). Something like this came up recently when we were discussing possible future changes in the network stack. It wouldn’t complicate API by a lot, since default implementation could just:

public Future<T> getCurrent() {
  return completedFuture(getCurrentBlocking());
}

Another thing to consider is maybe we would like to leave the door open for fetching records in some batches from the source’s input buffers? Source function (like Kafka) have some internal buffers and it would be more efficient to read all/deserialise all data present in the input buffer at once, instead of paying synchronisation/calling virtual method/etc costs once per each record.

Piotrek

On 22 Sep 2017, at 11:13, Aljoscha Krettek <[hidden email]> wrote:

@Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.

On 15. Sep 2017, at 20:17, Elias Levy <[hidden email]> wrote:

On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <[hidden email]> wrote:
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.

The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.


Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Elias Levy
I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new producer, when used with the exactly once semantics, has the rather troublesome behavior that it will fallback to at-most-once, rather than at-least-once, if the job is down for longer than the Kafka broker's transaction.max.timeout.ms setting.

In situations that require extended maintenance downtime, this behavior is nearly certain to lead to message loss, as a canceling a job while taking a savepoint will not wait for the Kafka transactions to bet committed and is not atomic.

So it seems like there is a need for an atomic stop or cancel with savepoint that waits for transactional sinks to commit and then immediately stop any further message processing.
 

On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <[hidden email]> wrote:
I would propose implementations of NewSource to be not blocking/asynchronous. For example something like

public abstract Future<T> getCurrent();

Which would allow us to perform some certain actions while there are no data available to process (for example flush output buffers). Something like this came up recently when we were discussing possible future changes in the network stack. It wouldn’t complicate API by a lot, since default implementation could just:

public Future<T> getCurrent() {
  return completedFuture(getCurrentBlocking());
}

Another thing to consider is maybe we would like to leave the door open for fetching records in some batches from the source’s input buffers? Source function (like Kafka) have some internal buffers and it would be more efficient to read all/deserialise all data present in the input buffer at once, instead of paying synchronisation/calling virtual method/etc costs once per each record.

Piotrek

On 22 Sep 2017, at 11:13, Aljoscha Krettek <[hidden email]> wrote:

@Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.

On 15. Sep 2017, at 20:17, Elias Levy <[hidden email]> wrote:

On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <[hidden email]> wrote:
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.

The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.



Reply | Threaded
Open this post in threaded view
|

Re: Flink flick cancel vs stop

Piotr Nowojski
Hi,

Yes we are aware of this issue and we would like to have it soon, but at the moment it does not look like clean shutdown will be ready for Flink 1.5.

Another solution is Kafka exactly-once producer implemented on top of the GenericWriteAheadSink. It could avoid this issue (at a cost of significantly higher overhead). There are plans to implement such producer as an alternative to the current one, but I do not know the timeline for that. It should be relatively easy task and we would welcome such contribution. 

Piotrek

On 14 Dec 2017, at 01:43, Elias Levy <[hidden email]> wrote:

I am re-upping this thread now that FlinkKafkaProducer011 is out.  The new producer, when used with the exactly once semantics, has the rather troublesome behavior that it will fallback to at-most-once, rather than at-least-once, if the job is down for longer than the Kafka broker's transaction.max.timeout.ms setting.

In situations that require extended maintenance downtime, this behavior is nearly certain to lead to message loss, as a canceling a job while taking a savepoint will not wait for the Kafka transactions to bet committed and is not atomic.

So it seems like there is a need for an atomic stop or cancel with savepoint that waits for transactional sinks to commit and then immediately stop any further message processing.
 

On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <[hidden email]> wrote:
I would propose implementations of NewSource to be not blocking/asynchronous. For example something like

public abstract Future<T> getCurrent();

Which would allow us to perform some certain actions while there are no data available to process (for example flush output buffers). Something like this came up recently when we were discussing possible future changes in the network stack. It wouldn’t complicate API by a lot, since default implementation could just:

public Future<T> getCurrent() {
  return completedFuture(getCurrentBlocking());
}

Another thing to consider is maybe we would like to leave the door open for fetching records in some batches from the source’s input buffers? Source function (like Kafka) have some internal buffers and it would be more efficient to read all/deserialise all data present in the input buffer at once, instead of paying synchronisation/calling virtual method/etc costs once per each record.

Piotrek

On 22 Sep 2017, at 11:13, Aljoscha Krettek <[hidden email]> wrote:

@Eron Yes, that would be the difference in characterisation. I think technically all sources could be transformed by that by pushing data into a (blocking) queue and having the "getElement()" method pull from that.

On 15. Sep 2017, at 20:17, Elias Levy <[hidden email]> wrote:

On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <[hidden email]> wrote:
Aljoscha, would it be correct to characterize your idea as a 'pull' source rather than the current 'push'?  It would be interesting to look at the existing connectors to see how hard it would be to reverse their orientation.   e.g. the source might require a buffer pool.

The Kafka client works that way.  As does the QueueingConsumer used by the RabbitMQ source.  The Kinesis and NiFi sources also seems to poll. Those are all the bundled sources.