Hi Soheil,
I think the root cause is that in the cancellation, the task was stuck in
org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
The taskmanager process exit is expected in this case to enforce a failure and recovery.
To be specific, when a task on the TM is to be canceled, a TaskCancelerWatchDog will be started to watch the cancellation.
If the cancellation timed out, the watchdog would trigger a fatal error to force the TM to exit.
I think you may need to diagnostic why the postgresql call took so long to flush data.
Alternatively, if the long flushing time cost is expected, one can increase the cancellation timeout ("task.cancellation.timeout") to avoid this issue.
Thanks,
Zhu Zhu
Hi,
I developed a single Flink job that read a huge amount of files and after some simple preprocessing, sink them into the database. I use the built-in JDBCOutputFormat for inserting records into the database. The problem is when I cancel the job using either the WebUI or the command line, the job did not cancel completely and finally, the taskmanager process crashes!
Here are the taskmanager logs (generated continuously for some seconds):
2020-02-15 01:17:17,208 WARN org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator - The reader is stuck in method:
java.lang.Object.wait(Native Method)
org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
2020-02-15 01:17:17,224 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
2020-02-15 01:17:17,225 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down.
I'm using the
Flink: 1.7.2,
java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Any help will be appreciated.
All the best,
Soheil