TaskManager Fail when I cancel the job and crash

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

TaskManager Fail when I cancel the job and crash

Soheil Pourbafrani
Hi,

I developed a single Flink job that read a huge amount of files and after some simple preprocessing, sink them into the database. I use the built-in JDBCOutputFormat for inserting records into the database. The problem is when I cancel the job using either the WebUI or the command line, the job did not cancel completely and finally, the taskmanager process crashes!
Here are the taskmanager logs (generated continuously for some seconds):

2020-02-15 01:17:17,208 WARN  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The reader is stuck in method:
 java.lang.Object.wait(Native Method)
org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
2020-02-15 01:17:17,224 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2020-02-15 01:17:17,225 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.

I'm using the
Flink: 1.7.2,
java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14) 

Any help will be appreciated.

All the best, 
Soheil
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager Fail when I cancel the job and crash

Zhu Zhu
Hi Soheil,

I think the root cause is that in the cancellation, the task was stuck in 

org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)


The taskmanager process exit is expected in this case to enforce a failure and recovery.
To be specific, when a task on the TM is to be canceled, a TaskCancelerWatchDog will be started to watch the cancellation. 
If the cancellation timed out, the watchdog would trigger a fatal error to force the TM to exit.

I think you may need to diagnostic why the postgresql call took so long to flush data.
Alternatively, if the long flushing time cost is expected, one can increase the cancellation timeout ("task.cancellation.timeout") to avoid this issue.

Thanks,
Zhu Zhu

Soheil Pourbafrani <[hidden email]> 于2020年2月15日周六 上午6:41写道:
Hi,

I developed a single Flink job that read a huge amount of files and after some simple preprocessing, sink them into the database. I use the built-in JDBCOutputFormat for inserting records into the database. The problem is when I cancel the job using either the WebUI or the command line, the job did not cancel completely and finally, the taskmanager process crashes!
Here are the taskmanager logs (generated continuously for some seconds):

2020-02-15 01:17:17,208 WARN  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The reader is stuck in method:
 java.lang.Object.wait(Native Method)
org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
2020-02-15 01:17:17,224 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2020-02-15 01:17:17,225 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.

I'm using the
Flink: 1.7.2,
java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14) 

Any help will be appreciated.

All the best, 
Soheil