Failed to cancel a job using the STOP rest API

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Failed to cancel a job using the STOP rest API

Thomas Wang
Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Failed to cancel a job using the STOP rest API

Yun Gao
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Failed to cancel a job using the STOP rest API

Thomas Wang
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Failed to cancel a job using the STOP rest API

Yun Gao
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid



------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Failed to cancel a job using the STOP rest API

Thomas Wang
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Failed to cancel a job using the STOP rest API

Thomas Wang
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Failed to cancel a job using the STOP rest API

Yun Gao
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Failed to cancel a job using the STOP rest API

Thomas Wang
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yun Gao
Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Kezhu Wang
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?




Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao ([hidden email]) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Piotr Nowojski-4
Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang <[hidden email]> napisał(a):
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?




Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao ([hidden email]) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas

Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yun Gao
Very thanks Kezhu for the catch, it also looks to me the same issue as FLINK-21028. 

------------------------------------------------------------------
From:Piotr Nowojski <[hidden email]>
Send Time:2021 Jun. 9 (Wed.) 22:12
To:Kezhu Wang <[hidden email]>
Cc:Thomas Wang <[hidden email]>; Yun Gao <[hidden email]>; user <[hidden email]>
Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang <[hidden email]> napisał(a):
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?




Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao ([hidden email]) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Thomas Wang
Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which uses Flink 1.12.1. I will report back whether this resolves the issue.

Thomas

On Wed, Jun 9, 2021 at 11:15 PM Yun Gao <[hidden email]> wrote:
Very thanks Kezhu for the catch, it also looks to me the same issue as FLINK-21028. 

------------------------------------------------------------------
From:Piotr Nowojski <[hidden email]>
Send Time:2021 Jun. 9 (Wed.) 22:12
To:Kezhu Wang <[hidden email]>
Cc:Thomas Wang <[hidden email]>; Yun Gao <[hidden email]>; user <[hidden email]>
Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang <[hidden email]> napisał(a):
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?




Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao ([hidden email]) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Piotr Nowojski-4
Hi Thomas. The bug https://issues.apache.org/jira/browse/FLINK-21028 is still present in 1.12.1. You would need to upgrade to at least 1.13.0, 1.12.2 or 1.11.4. However as I mentioned before, 1.11.4 hasn't yet been released. On the other hand both 1.12.2 and 1.13.0 have already been superseded by more recent minor releases (1.13.1 and 1.12.4 respectively).

Piotre

śr., 16 cze 2021 o 06:01 Thomas Wang <[hidden email]> napisał(a):
Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which uses Flink 1.12.1. I will report back whether this resolves the issue.

Thomas

On Wed, Jun 9, 2021 at 11:15 PM Yun Gao <[hidden email]> wrote:
Very thanks Kezhu for the catch, it also looks to me the same issue as FLINK-21028. 

------------------------------------------------------------------
From:Piotr Nowojski <[hidden email]>
Send Time:2021 Jun. 9 (Wed.) 22:12
To:Kezhu Wang <[hidden email]>
Cc:Thomas Wang <[hidden email]>; Yun Gao <[hidden email]>; user <[hidden email]>
Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang <[hidden email]> napisał(a):
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?




Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao ([hidden email]) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Thomas Wang
Thanks Piotr. Unfortunately, we are using Amazon EMR and their latest EMR version 6.3.0 uses Flink 1.12.1. We don't have a lot of control over that.

image.png

Thomas

On Thu, Jun 17, 2021 at 5:39 AM Piotr Nowojski <[hidden email]> wrote:
Hi Thomas. The bug https://issues.apache.org/jira/browse/FLINK-21028 is still present in 1.12.1. You would need to upgrade to at least 1.13.0, 1.12.2 or 1.11.4. However as I mentioned before, 1.11.4 hasn't yet been released. On the other hand both 1.12.2 and 1.13.0 have already been superseded by more recent minor releases (1.13.1 and 1.12.4 respectively).

Piotre

śr., 16 cze 2021 o 06:01 Thomas Wang <[hidden email]> napisał(a):
Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which uses Flink 1.12.1. I will report back whether this resolves the issue.

Thomas

On Wed, Jun 9, 2021 at 11:15 PM Yun Gao <[hidden email]> wrote:
Very thanks Kezhu for the catch, it also looks to me the same issue as FLINK-21028. 

------------------------------------------------------------------
From:Piotr Nowojski <[hidden email]>
Send Time:2021 Jun. 9 (Wed.) 22:12
To:Kezhu Wang <[hidden email]>
Cc:Thomas Wang <[hidden email]>; Yun Gao <[hidden email]>; user <[hidden email]>
Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

Yes good catch Kezhu, IllegalStateException sounds very much like FLINK-21028.

Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't been released yet)?

Piotrek

wt., 8 cze 2021 o 17:18 Kezhu Wang <[hidden email]> napisał(a):
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?




Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao ([hidden email]) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed 
an issue for the exception first [1].





------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Tue Jun 8 07:45:52 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Re: Failed to cancel a job using the STOP rest API
This is actually a very simple job that reads from Kafka and writes to S3 using the StreamingFileSink w/ Parquet format. I'm all using Flink's API and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <[hidden email]> wrote:
Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as expected to me... 
Could you also show us the dag of the job ? And does some operators in the source task
use multiple-threads to emit records?

Best,
Yun


------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Re: Failed to cancel a job using the STOP rest API
One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <[hidden email]> wrote:
Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```

On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

For querying the savepoint status, a get request could be issued to 
 /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in the 
TaskManager's log when it could not be stopped ?

Best,
Yun




------------------Original Mail ------------------
Sender:Thomas Wang <[hidden email]>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <[hidden email]>
CC:user <[hidden email]>
Subject:Re: Failed to cancel a job using the STOP rest API
Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific job, there is only one source that is using FlinkKafkaConsumer which I assume has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint location?

Thanks.

Thomas

On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <[hidden email]> wrote:
Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, and since
the response of the rest API is ok and the savepoint is triggered successfully, I reckon
that it might not be due to rest API process, and we might still first focus on the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the sources to 
stop the job. Thus are the sources all legacy source (namely the one using SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun

------------------------------------------------------------------
From:Thomas Wang <[hidden email]>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <[hidden email]>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some inconsistent results. Sometimes, jobs could be cancelled successfully while other times, they couldn't. Either way, the POST request is accepted with a status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. However the job is still in running state afterwards. The CLI command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the resulting savepoint location. If I understand this correctly, the CLI should be using the same REST API behind the scenes, isn't it?

Here is my POST request URL: `http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I query to get the status of the request? Most importantly, where can I get the expected savepoint location?

Thanks.

Thomas