How to stop FlinkKafkaConsumer and make job finished?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How to stop FlinkKafkaConsumer and make job finished?

jaxon
I would like to stop FlinkKafkaConsumer consuming data from kafka manually. But I find it won't be close when I invoke "cancel()" method. What I am trying to do is add an EOF symbol meaning the end of my kafka data, and when the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink streaming job won't finish unless it get canceled or failed, when I use kafka as source.

Somebody knowing  gives me some help, thx~~  
Reply | Threaded
Open this post in threaded view
|

Re: How to stop FlinkKafkaConsumer and make job finished?

Ufuk Celebi
Hey Jaxon,

I don't think it's possible to control this via the life-cycle methods
of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <[hidden email]> wrote:
> I would like to stop FlinkKafkaConsumer consuming data from kafka manually.
> But I find it won't be close when I invoke "cancel()" method. What I am
> trying to do is add an EOF symbol meaning the end of my kafka data, and when
> the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> "cancel()" method. It doesn't work. Flink streaming job won't finish unless
> it get canceled or failed, when I use kafka as source.
>
> Somebody knowing  gives me some help, thx~~
Reply | Threaded
Open this post in threaded view
|

Re: How to stop FlinkKafkaConsumer and make job finished?

Eron Wright
I believe you can extend the `KeyedDeserializationSchema` that you pass to the consumer to check for end-of-stream markers.


Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Jaxon,

I don't think it's possible to control this via the life-cycle methods
of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <[hidden email]> wrote:
> I would like to stop FlinkKafkaConsumer consuming data from kafka manually.
> But I find it won't be close when I invoke "cancel()" method. What I am
> trying to do is add an EOF symbol meaning the end of my kafka data, and when
> the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> "cancel()" method. It doesn't work. Flink streaming job won't finish unless
> it get canceled or failed, when I use kafka as source.
>
> Somebody knowing  gives me some help, thx~~

Reply | Threaded
Open this post in threaded view
|

Re: How to stop FlinkKafkaConsumer and make job finished?

Ufuk Celebi
Yes, that sounds like what Jaxon is looking for. :-) Thanks for the
pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <[hidden email]> wrote:

> I believe you can extend the `KeyedDeserializationSchema` that you pass to
> the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle methods
>> of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <[hidden email]> wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What I am
>> > trying to do is add an EOF symbol meaning the end of my kafka data, and
>> > when
>> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
>> > "cancel()" method. It doesn't work. Flink streaming job won't finish
>> > unless
>> > it get canceled or failed, when I use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to stop FlinkKafkaConsumer and make job finished?

jaxon
In reply to this post by Eron Wright
Thanks Eron
I have tried to read  an EOF symbol and invoke FlinkKafkaConsumer's cancel
method, it doesn't work. But I  invoke the method in a FlatMap operator
which is next to source operator, I guess that is the problem. I will try
your answer, thanks for your suggestion.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

RE: How to stop FlinkKafkaConsumer and make job finished?

LINZ, Arnaud
In reply to this post by Ufuk Celebi
Hi,

My 2 cents: not being able to programmatically nicely stop a Flink stream is what lacks most to the framework IMHO. It's a very common use case: each time you want to update the application or change its configuration you need to nicely stop  & restart it, without triggering alerts, data loss, or anything else.
That's why I never use the provided Flink Sources "out of the box". I've made a framework that encapsulate them, adding a monitoring thread that periodically check for a special "hdfs stop file" and try to nicely cancel() the source if the user requested a stop by this mean (I've found that the hdfs file trick is most easy way to reach from an external application all task managers running on unknown hosts).

I could not use the "special message" trick because in most real production environment you cannot, as a client, post a message in a queue just for your client's need: you don't have proper access rights to do so ; and you don't know how other clients, connected to the same data, may react to fake messages...

Unfortunately most Flink sources cannot be "cancelled" nicely without changing part of their code. It's the case for the Kafka source.

- If a kafa consumer source instance is not connected to any partition (because it's parallelism level exceeds the kafka consumer group partition number for instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until thread is interrupted :

                        // wait until this is canceled
                        final Object waitLock = new Object();
                        while (running) {
                                try {
                                        //noinspection SynchronizationOnLocalVariableOrMethodParameter
                                        synchronized (waitLock) {
                                                waitLock.wait();
                                        }
                                }
                                catch (InterruptedException e) {
                                        if (!running) {
                                                // restore the interrupted state, and fall through the loop
                                                Thread.currentThread().interrupt();
                                        }
                                }
                        }

So either you change the code, or in your monitoring thread you interrupt the source thread -- but that will trigger the HA mechanism, the source instance will be relaunched n times before failing.

- BTW it's also the case with RMQSource, as the "nextDelivery" in RMQSource.run() is called without timeout :
        @Override
        public void run(SourceContext<OUT> ctx) throws Exception {
                while (running) {
                        QueueingConsumer.Delivery delivery = consumer.nextDelivery();

So if no message arrives, the while running check is not done and the source cannot be cancelled without hard interruption.

Best regards,
Arnaud


-----Message d'origine-----
De : Ufuk Celebi [mailto:[hidden email]]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright <[hidden email]>
Cc : Ufuk Celebi <[hidden email]>; Jaxon Hu <[hidden email]>; user <[hidden email]>; Aljoscha Krettek <[hidden email]>
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?

Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <[hidden email]> wrote:

> I believe you can extend the `KeyedDeserializationSchema` that you
> pass to the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
> rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
> a.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle
>> methods of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <[hidden email]> wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What
>> > I am trying to do is add an EOF symbol meaning the end of my kafka
>> > data, and when the FlatMap operator read the symbol it will invoke
>> > FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink
>> > streaming job won't finish unless it get canceled or failed, when I
>> > use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~
>
>

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: How to stop FlinkKafkaConsumer and make job finished?

Timo Walther
Hi Arnaud,

thanks for letting us know your workaround. I agree that this is a
frequently asked topic and important in certain use cases. I'm sure that
it will be solved in the near future depending on the priorities.

My 2 cents: Flink is an open source project maybe somebody is willing to
work on a solution for one or more Flink sources :)

Regards,
Timo


Am 1/2/18 um 4:50 PM schrieb LINZ, Arnaud:

> Hi,
>
> My 2 cents: not being able to programmatically nicely stop a Flink stream is what lacks most to the framework IMHO. It's a very common use case: each time you want to update the application or change its configuration you need to nicely stop  & restart it, without triggering alerts, data loss, or anything else.
> That's why I never use the provided Flink Sources "out of the box". I've made a framework that encapsulate them, adding a monitoring thread that periodically check for a special "hdfs stop file" and try to nicely cancel() the source if the user requested a stop by this mean (I've found that the hdfs file trick is most easy way to reach from an external application all task managers running on unknown hosts).
>
> I could not use the "special message" trick because in most real production environment you cannot, as a client, post a message in a queue just for your client's need: you don't have proper access rights to do so ; and you don't know how other clients, connected to the same data, may react to fake messages...
>
> Unfortunately most Flink sources cannot be "cancelled" nicely without changing part of their code. It's the case for the Kafka source.
>
> - If a kafa consumer source instance is not connected to any partition (because it's parallelism level exceeds the kafka consumer group partition number for instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until thread is interrupted :
>
>                          // wait until this is canceled
>                          final Object waitLock = new Object();
>                          while (running) {
>                                  try {
>                                          //noinspection SynchronizationOnLocalVariableOrMethodParameter
>                                          synchronized (waitLock) {
>                                                  waitLock.wait();
>                                          }
>                                  }
>                                  catch (InterruptedException e) {
>                                          if (!running) {
>                                                  // restore the interrupted state, and fall through the loop
>                                                  Thread.currentThread().interrupt();
>                                          }
>                                  }
>                          }
>
> So either you change the code, or in your monitoring thread you interrupt the source thread -- but that will trigger the HA mechanism, the source instance will be relaunched n times before failing.
>
> - BTW it's also the case with RMQSource, as the "nextDelivery" in RMQSource.run() is called without timeout :
>          @Override
>          public void run(SourceContext<OUT> ctx) throws Exception {
>                  while (running) {
>                          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
>
> So if no message arrives, the while running check is not done and the source cannot be cancelled without hard interruption.
>
> Best regards,
> Arnaud
>
>
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:[hidden email]]
> Envoyé : vendredi 29 décembre 2017 10:30
> À : Eron Wright <[hidden email]>
> Cc : Ufuk Celebi <[hidden email]>; Jaxon Hu <[hidden email]>; user <[hidden email]>; Aljoscha Krettek <[hidden email]>
> Objet : Re: How to stop FlinkKafkaConsumer and make job finished?
>
> Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer Eron.
>
> – Ufuk
>
> On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <[hidden email]> wrote:
>> I believe you can extend the `KeyedDeserializationSchema` that you
>> pass to the consumer to check for end-of-stream markers.
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
>> rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
>> a.html#isEndOfStream-T-
>>
>> Eron
>>
>> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <[hidden email]> wrote:
>>> Hey Jaxon,
>>>
>>> I don't think it's possible to control this via the life-cycle
>>> methods of your functions.
>>>
>>> Note that Flink currently does not support graceful stop in a
>>> meaningful manner and you can only cancel running jobs. What comes to
>>> my mind to cancel on EOF:
>>>
>>> 1) Extend Kafka consumer to stop emitting records after your EOF
>>> record. Look at the flink-connector-kafka-base module. This is
>>> probably not feasible and some work to get familiar with the code.
>>> Just putting in out there.
>>>
>>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>>
>>> 3) Use an Http client and cancel your job via the Http endpoint
>>>
>>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>>> Easy, but not nice, since you need quite some logic in your function
>>> (e.g. ignore records after EOF record until cancellation, etc.).
>>>
>>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>>
>>> – Ufuk
>>>
>>>
>>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <[hidden email]> wrote:
>>>> I would like to stop FlinkKafkaConsumer consuming data from kafka
>>>> manually.
>>>> But I find it won't be close when I invoke "cancel()" method. What
>>>> I am trying to do is add an EOF symbol meaning the end of my kafka
>>>> data, and when the FlatMap operator read the symbol it will invoke
>>>> FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink
>>>> streaming job won't finish unless it get canceled or failed, when I
>>>> use kafka as source.
>>>>
>>>> Somebody knowing  gives me some help, thx~~
>>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


Reply | Threaded
Open this post in threaded view
|

Re: How to stop FlinkKafkaConsumer and make job finished?

Aljoscha Krettek
Hi,

There is a discussion about this in the Jira Issue: https://issues.apache.org/jira/browse/FLINK-7883. See especially my last comment there.

Best,
Aljoscha

On 2. Jan 2018, at 17:19, Timo Walther <[hidden email]> wrote:

Hi Arnaud,

thanks for letting us know your workaround. I agree that this is a frequently asked topic and important in certain use cases. I'm sure that it will be solved in the near future depending on the priorities.

My 2 cents: Flink is an open source project maybe somebody is willing to work on a solution for one or more Flink sources :)

Regards,
Timo


Am 1/2/18 um 4:50 PM schrieb LINZ, Arnaud:
Hi,

My 2 cents: not being able to programmatically nicely stop a Flink stream is what lacks most to the framework IMHO. It's a very common use case: each time you want to update the application or change its configuration you need to nicely stop  & restart it, without triggering alerts, data loss, or anything else.
That's why I never use the provided Flink Sources "out of the box". I've made a framework that encapsulate them, adding a monitoring thread that periodically check for a special "hdfs stop file" and try to nicely cancel() the source if the user requested a stop by this mean (I've found that the hdfs file trick is most easy way to reach from an external application all task managers running on unknown hosts).

I could not use the "special message" trick because in most real production environment you cannot, as a client, post a message in a queue just for your client's need: you don't have proper access rights to do so ; and you don't know how other clients, connected to the same data, may react to fake messages...

Unfortunately most Flink sources cannot be "cancelled" nicely without changing part of their code. It's the case for the Kafka source.

- If a kafa consumer source instance is not connected to any partition (because it's parallelism level exceeds the kafka consumer group partition number for instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until thread is interrupted :

                        // wait until this is canceled
                        final Object waitLock = new Object();
                        while (running) {
                                try {
                                        //noinspection SynchronizationOnLocalVariableOrMethodParameter
                                        synchronized (waitLock) {
                                                waitLock.wait();
                                        }
                                }
                                catch (InterruptedException e) {
                                        if (!running) {
                                                // restore the interrupted state, and fall through the loop
                                                Thread.currentThread().interrupt();
                                        }
                                }
                        }

So either you change the code, or in your monitoring thread you interrupt the source thread -- but that will trigger the HA mechanism, the source instance will be relaunched n times before failing.

- BTW it's also the case with RMQSource, as the "nextDelivery" in RMQSource.run() is called without timeout :
        @Override
        public void run(SourceContext<OUT> ctx) throws Exception {
                while (running) {
                        QueueingConsumer.Delivery delivery = consumer.nextDelivery();

So if no message arrives, the while running check is not done and the source cannot be cancelled without hard interruption.

Best regards,
Arnaud


-----Message d'origine-----
De : Ufuk Celebi [[hidden email]]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright <[hidden email]>
Cc : Ufuk Celebi <[hidden email]>; Jaxon Hu <[hidden email]>; user <[hidden email]>; Aljoscha Krettek <[hidden email]>
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?

Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <[hidden email]> wrote:
I believe you can extend the `KeyedDeserializationSchema` that you
pass to the consumer to check for end-of-stream markers.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
a.html#isEndOfStream-T-

Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <[hidden email]> wrote:
Hey Jaxon,

I don't think it's possible to control this via the life-cycle
methods of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint

(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <[hidden email]> wrote:
I would like to stop FlinkKafkaConsumer consuming data from kafka
manually.
But I find it won't be close when I invoke "cancel()" method. What
I am trying to do is add an EOF symbol meaning the end of my kafka
data, and when the FlatMap operator read the symbol it will invoke
FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink
streaming job won't finish unless it get canceled or failed, when I
use kafka as source.

Somebody knowing  gives me some help, thx~~

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.