Flink job repeated restart failure

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job repeated restart failure

vinaya
This post was updated on .
Dear all,

One of the Flink jobs gave below exception and failed. Several attempts to restart the job resulted in the same exception and the job failed each time. The job started successfully only after changing the job name.

Flink Version: 1.11.2

Exception
2021-03-24 20:13:09,288 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-03-24 20:13:09,288 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-2] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
2021-03-24 20:13:09,304 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31) switched from RUNNING to FAILED.
java.io.IOException: No such file or directory
        at java.io.UnixFileSystem.createFileExclusively(Native Method) ~[?:1.8.0_252]
        at java.io.File.createNewFile(File.java:1012) ~[?:1.8.0_252]
        at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.createSpillingChannel(SpanningWrapper.java:291) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.updateLength(SpanningWrapper.java:178) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.transferFrom(SpanningWrapper.java:111) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.12-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.12-1.11.2.jar:1.11.2]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
2021-03-24 20:13:09,305 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31).
2021-03-24 20:13:09,311 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) 8905142514cb25adbd42980680562d31.

File: https://github.com/apache/flink/blob/release-1.11.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java

Related Jira ID: https://issues.apache.org/jira/browse/FLINK-18811

Similar exception mentioned in FLINK-18811 which has a fix in 1.12.0. Though in our case, we didn't notice any disk failure. Is there any other reason(s) for the above mentioned IOException?

While we are planning to upgrade to the latest Flink version, are there any other workaround(s) instead of deploying the job again with a different job name?

Kind regards,
Vinaya
Reply | Threaded
Open this post in threaded view
|

Re: Flink job repeated restart failure

Arvid Heise-4
Hi Vinaya,

SpillingAdaptiveSpanningRecordDeserializer tries to create a directory in the temp directory, which you can configure by setting io.tmp.dirs. By default, it's set to System.getProperty("java.io.tmpdir"), which seems to be invalid in your case. (Note that the directory has to exist on the task managers)

Best,

Arvid

On Thu, Mar 25, 2021 at 7:27 AM VINAYA KUMAR BENDI <[hidden email]> wrote:

Dear all,

 

One of the Flink jobs gave below exception and failed. Several attempts to restart the job resulted in the same exception and the job failed each time. The job started successfully only after changing the file name.

 

Flink Version: 1.11.2

 

Exception

2021-03-24 20:13:09,288 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 0 ms.

2021-03-24 20:13:09,288 INFO  org.apache.kafka.clients.producer.KafkaProducer              [] - [Producer clientId=producer-2] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.

2021-03-24 20:13:09,304 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31) switched from RUNNING to FAILED.

java.io.IOException: No such file or directory

        at java.io.UnixFileSystem.createFileExclusively(Native Method) ~[?:1.8.0_252]

        at java.io.File.createNewFile(File.java:1012) ~[?:1.8.0_252]

        at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.createSpillingChannel(SpanningWrapper.java:291) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.updateLength(SpanningWrapper.java:178) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.api.serialization.SpanningWrapper.transferFrom(SpanningWrapper.java:111) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:85) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:146) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.12-1.11.2.jar:1.11.2]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.12-1.11.2.jar:1.11.2]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

2021-03-24 20:13:09,305 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) (8905142514cb25adbd42980680562d31).

2021-03-24 20:13:09,311 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Flat Map -> async wait operator -> Process -> Sink: Unnamed (1/1) 8905142514cb25adbd42980680562d31.

 

File: https://github.com/apache/flink/blob/release-1.11.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java

 

Related Jira ID: https://issues.apache.org/jira/browse/FLINK-18811

 

Similar exception mentioned in FLINK-18811 which has a fix in 1.12.0. Though in our case, we didn’t notice any disk failure. Is there any other reason(s) for the above mentioned IOException?

 

While we are planning to upgrade to the latest Flink version, are there any other workaround(s) instead of deploying the job again with a different file name?

 

Kind regards,

Vinaya

Reply | Threaded
Open this post in threaded view
|

Re: Flink job repeated restart failure

vinaya
This post was updated on .
Hi Arvid,

Thank you for the suggestion.

Indeed, the specified setting (io.tmp.dirs) was commented out in the Flink configuration
(flink-conf.yaml).

  # io.tmp.dirs: /tmp

However, "java.io.tmpdir" was configured to "/tmp" as can be verified from below command.

  $ java -XshowSettings
  VM settings:
      Max. Heap Size (Estimated): 6.89G
      Ergonomics Machine Class: server
      Using VM: OpenJDK 64-Bit Server VM

  Property settings:
      awt.toolkit = sun.awt.X11.XToolkit
      ...
      java.io.tmpdir = /tmp
      ...

From task manager log:
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Client environment:java.io.tmpdir=/tmp

Also, the read and write access to /tmp is working.

  $ touch /tmp/somefile
  $ ls -l /tmp/somefile
  -rw-rw-r--. 1 centos centos 0 Mar 26 06:28 /tmp/somefile

Kind regards,
Vinaya



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink job repeated restart failure

Arvid Heise-4
Hi Vinaya,

java.io.tmpdir is already the fallback and I'm not aware of another level of fallback.

Ensuring java.io.tmpdir is valid is also relevant for some third-party libraries that rely on it (e.g. FileSystem that cache local files). It's good practice to set that appropriately.

On Fri, Mar 26, 2021 at 6:32 AM vinaya <[hidden email]> wrote:
Hi Arvid,

Thank you for the suggestion.

Indeed, the specified setting was commented out in the Flink configuration
(flink-conf.yaml).

  # io.tmp.dirs: /tmp

Is there a fallback (e.g. /tmp) if io.tmp.dirs and
System.getProperty("java.io.tmpdir") are both not set?

Will configure this setting to a valid value as suggested.

Kind regards,
Vinaya



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/