Hi there,
I'm currently analyzing a weird behavior of one of our jobs running on YARN with Flink 1.11.2. I have a kind of special situation here in that regard that I submit a single streaming job with a disjoint job graph, i.e. that job contains two graphs of the same kind but totally independent of each other (one having an ETL pipeline for source1, another for source2). It's just for historical reasons that makes deployment a bit easier.
I had the job running nicely until I wanted to stop it with a savepoint as usual like so:
flink stop -p ${SAVEPOINT_BASEDIR}/${FLINK_JOB_NAME}/SAVEPOINTS --yarnapplicationId=${FLINK_YARN_APPID} ${ID}
After a minute, I receiveid a timeout exception [1].
Now the interesting part/possible bugs are the following 3:
1. The stop was triggered at 21/01/12 06:15:10. I see that almost all tasks of the entire job switched from RUNNING to FINISHED within seconds, but two tasks had something that looks like a racecondition on shutdown. They threw an IllegalStateException in BufferBuilder.append where an assertion makes sure that the buffer is not yet finished. [2]
2. That failure lead to RESTARTING the tasks of that job. So the failure occured 5 seconds after I triggered to stop the job. And 2 seconds later, I see that the pipeline switched it's state to RUNNING again. No wonder that the "stop" eventually stopped with a Timeout as the Job didn't think about shutting down anymore.
3. BUT the major issue for me here is: The entire pipeline of source1 was restarted, but the pipeline of source2 was still FINISHED. As Fink did quite some stuff with Batch/Streaming unification and region failover/restart in the last versions, my guess is that as I am in the special case of a disjoint graph here, only the tasks in the connected graph where the error occured restarted properly and the other graph was left in FINISHED state, even though I am dealing with a streaming job here.
The problem is that the job was left in kind of a broken state: From just watching at YARN / Flink UI it seemed to be still running and the stop had no effect, but in reality, it shut down a huge part of the job. My workaround of course is as following:
1. If a "graceful stop" won't succeed, in future I will trigger a hard kill "yarn application -kill" afterwards because I can't be certain in what state the job is after a failed attempt to stop.
2. I will enforce stronger isolation in my jobs so that I always have connected graphs as jobs. In my case: I will deploy two independent jobs for the two ETL pipelines and hope that this problem won't arise again (At least, have the entire job either FINISHED or RUNNING).
But I'm curious what you think: Are those 3 bugs or (some of it) kind of expected behaviour? Should I open bug ticket(s) for those?
Best regards
Theo
[1] Timeout from flink stop:
org.apache.flink.util.FlinkException: Could not stop with a savepoint job "f23290bf5fb0ecd49a4455e4a65f2eb6".
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
... 9 more
[2] Exception in graceful shutdown:
2021-01-12T06:15:15.827877+01:00 WARN org.apache.flink.runtime.taskmanager.Task Source: rawdata_source1 -> validation_source1 -> enrich_source1 -> map_json_source1 -> Sink: write_to_kafka_source1) (3/18) (bc68320cf69dd877782417a3298499d6) switched from RUNNING to FAILED.
java.util.concurrent.ExecutionException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
at org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter.onPeriodicEmit(AssignerWithPeriodicWatermarksAdapter.java:54)
at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
... 13 more
Caused by: java.lang.RuntimeException
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
at org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
... 25 more
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
... 29 more