Stateful functions 2.2 and stop with savepoint

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful functions 2.2 and stop with savepoint

Meissner, Dylan
I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Kezhu Wang
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Meissner, Dylan
Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Kezhu Wang
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in deprecation. Comparing to take-savepoints and then cancel approach, there will be no checkpoints in between. This may be important if there are two phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan ([hidden email]) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Piotr Nowojski-5
Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the stream iterations [2]? The first e-mail suggests stateful functions, but the ticket that Kezhu created is talking about the latter.

Piotrek




niedz., 28 lut 2021 o 15:33 Kezhu Wang <[hidden email]> napisał(a):
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in deprecation. Comparing to take-savepoints and then cancel approach, there will be no checkpoints in between. This may be important if there are two phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan ([hidden email]) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Kezhu Wang
Hi all,

My BAD!!!

Sorry for apparent mess up in that moment.

I will write a separate test for stream iterations.


The stateful function part should be a separated issue.


Best,
Kezhu Wang


On March 4, 2021 at 22:13:48, Piotr Nowojski ([hidden email]) wrote:

Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the stream iterations [2]? The first e-mail suggests stateful functions, but the ticket that Kezhu created is talking about the latter.

Piotrek




niedz., 28 lut 2021 o 15:33 Kezhu Wang <[hidden email]> napisał(a):
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in deprecation. Comparing to take-savepoints and then cancel approach, there will be no checkpoints in between. This may be important if there are two phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan ([hidden email]) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Piotr Nowojski-5
It doesn't change much ;) There is a known issue of stopping with savepoint and stateful functions not working [1]. The difference is that this one we will probably want to tackle sooner or later. Old streaming iterations are probably dead..


czw., 4 mar 2021 o 15:56 Kezhu Wang <[hidden email]> napisał(a):
Hi all,

My BAD!!!

Sorry for apparent mess up in that moment.

I will write a separate test for stream iterations.


The stateful function part should be a separated issue.


Best,
Kezhu Wang


On March 4, 2021 at 22:13:48, Piotr Nowojski ([hidden email]) wrote:

Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the stream iterations [2]? The first e-mail suggests stateful functions, but the ticket that Kezhu created is talking about the latter.

Piotrek




niedz., 28 lut 2021 o 15:33 Kezhu Wang <[hidden email]> napisał(a):
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in deprecation. Comparing to take-savepoints and then cancel approach, there will be no checkpoints in between. This may be important if there are two phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan ([hidden email]) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Meissner, Dylan
Thank you for this information, Piotr.

The comment from Igal Shilman in FLINK-18894 issue says, "Obtaining a MAX_PRIO mailbox from StreamTask, solves this issue." I'm unclear what this means -- is this a workaround I can leverage?

Dylan

From: Piotr Nowojski <[hidden email]>
Sent: Thursday, March 4, 2021 7:03 AM
To: Kezhu Wang <[hidden email]>
Cc: Meissner, Dylan <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
It doesn't change much ;) There is a known issue of stopping with savepoint and stateful functions not working [1]. The difference is that this one we will probably want to tackle sooner or later. Old streaming iterations are probably dead..


czw., 4 mar 2021 o 15:56 Kezhu Wang <[hidden email]> napisał(a):
Hi all,

My BAD!!!

Sorry for apparent mess up in that moment.

I will write a separate test for stream iterations.


The stateful function part should be a separated issue.


Best,
Kezhu Wang


On March 4, 2021 at 22:13:48, Piotr Nowojski ([hidden email]) wrote:

Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the stream iterations [2]? The first e-mail suggests stateful functions, but the ticket that Kezhu created is talking about the latter.

Piotrek




niedz., 28 lut 2021 o 15:33 Kezhu Wang <[hidden email]> napisał(a):
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in deprecation. Comparing to take-savepoints and then cancel approach, there will be no checkpoints in between. This may be important if there are two phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan ([hidden email]) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?
Reply | Threaded
Open this post in threaded view
|

Re: Stateful functions 2.2 and stop with savepoint

Igal Shilman
Hi Dylan,

Unfortunately stop with savepoint is not supported with StateFun.
We will bump the priority of this issue and try to address it in the next bugfix release.

Thanks,
Igal.

On Mon, Mar 8, 2021 at 9:08 PM Meissner, Dylan <[hidden email]> wrote:
Thank you for this information, Piotr.

The comment from Igal Shilman in FLINK-18894 issue says, "Obtaining a MAX_PRIO mailbox from StreamTask, solves this issue." I'm unclear what this means -- is this a workaround I can leverage?

Dylan

From: Piotr Nowojski <[hidden email]>
Sent: Thursday, March 4, 2021 7:03 AM
To: Kezhu Wang <[hidden email]>
Cc: Meissner, Dylan <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
It doesn't change much ;) There is a known issue of stopping with savepoint and stateful functions not working [1]. The difference is that this one we will probably want to tackle sooner or later. Old streaming iterations are probably dead..


czw., 4 mar 2021 o 15:56 Kezhu Wang <[hidden email]> napisał(a):
Hi all,

My BAD!!!

Sorry for apparent mess up in that moment.

I will write a separate test for stream iterations.


The stateful function part should be a separated issue.


Best,
Kezhu Wang


On March 4, 2021 at 22:13:48, Piotr Nowojski ([hidden email]) wrote:

Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the stream iterations [2]? The first e-mail suggests stateful functions, but the ticket that Kezhu created is talking about the latter.

Piotrek




niedz., 28 lut 2021 o 15:33 Kezhu Wang <[hidden email]> napisał(a):
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in deprecation. Comparing to take-savepoints and then cancel approach, there will be no checkpoints in between. This may be important if there are two phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan ([hidden email]) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang <[hidden email]>
Sent: Sunday, February 28, 2021 12:31 AM
To: [hidden email] <[hidden email]>; Meissner, Dylan <[hidden email]>
Subject: Re: Stateful functions 2.2 and stop with savepoint
 
Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 for it. You could track progress there.




Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan ([hidden email]) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop <job-id>" or even "flink stop --drain <job-id>", the operation never completes, reporting IN_PROGRESS until I hit the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?