Exception in BucketingSink when cancelling Flink job

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception in BucketingSink when cancelling Flink job

wangsan
Hi,

We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
        .......
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job?

Best,
wangsan



Reply | Threaded
Open this post in threaded view
|

Re:Exception in BucketingSink when cancelling Flink job

wangsan

After digging into the source code, we found that when Flink job is canceled, a TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, and following with join method. And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    throw new InterruptedIOException(        "Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval). Can anyone help?



Hi,

We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
        .......
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job?

Best,
wangsan



Reply | Threaded
Open this post in threaded view
|

Re: Exception in BucketingSink when cancelling Flink job

Stefan Richter
Hi,

I would speculate that the reason for this order is that we want to shutdown the tasks quickly by interrupting blocking calls in the event of failure, so that recover can begin as fast as possible. I am looping in Stephan who might give more details about this code.

Best,
Stefan 

Am 27.09.2017 um 07:33 schrieb wangsan <[hidden email]>:

After digging into the source code, we found that when Flink job is canceled, a TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, and following with join method. And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    throw new InterruptedIOException(        "Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval). Can anyone help?





Hi,

We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
        .......
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job?

Best,
wangsan






Reply | Threaded
Open this post in threaded view
|

Re:Re: Exception in BucketingSink when cancelling Flink job

wangsan
Hi,

'Join' method can be call with a timeout (as is called in TaskCanceler), so it won't be block forever if  the respective thread is in deadlock state. Maybe calling 'interrupt()'  after 'join(timeout)' is more reasonable, altought it still can not make sure operations inside 'close()' method is finished.

Best,
wangsan

在2017年09月29 01时52分, "Stephan Ewen"<[hidden email]>写道:

Hi!

Calling 'interrupt()' makes only sense before 'join()', because 'join()' blocks until the respective thread is finished.
The 'interrupt()' call happens to cancel the task out of potentially blocking I/O or sleep/wait operations.

The problem is that HDFS does not handle interrupts correctly, it sometimes deadlocks in the case of interrupts on unclosed streams :-(

I think it would be important to make sure (in the Bucketing Sink) that the DFS streams are closed upon task cancellation.
@aljoscha - adding you to this thread, as you know most about the bucketing sink.

Best,
Stephan


On Wed, Sep 27, 2017 at 10:18 AM, Stefan Richter <[hidden email]> wrote:
Hi,

I would speculate that the reason for this order is that we want to shutdown the tasks quickly by interrupting blocking calls in the event of failure, so that recover can begin as fast as possible. I am looping in Stephan who might give more details about this code.

Best,
Stefan 

Am 27.09.2017 um 07:33 schrieb wangsan <[hidden email]>:

After digging into the source code, we found that when Flink job is canceled, a TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, and following with join method. And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    throw new InterruptedIOException(        "Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval). Can anyone help?





Hi,

We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
        .......
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job?

Best,
wangsan







Reply | Threaded
Open this post in threaded view
|

Re: Exception in BucketingSink when cancelling Flink job

Erik van Oosten
In reply to this post by wangsan
Hi Wangsan,

We were struggling with this for many days as well. In the end we found a work around. Well work-around, this for sure qualifies as one of the ugliest hacks I have ever contemplated. Our work-around for Flink immediately interrupting the close, is to continue closing on another thread! Here is an example in Scala:

class MyBucketingSink[A](basePath: String) extends BucketingSink[A](basePath) {

override def close(): Unit = {
//
// Unfortunately, Flink closes very very very very eagerly. So eagerly in fact that it will try to kill us by
// interrupting the current thread immediately. Let's try to continue on a different thread :evil-grin:
//

def superClose(): Unit = super.close()

new Thread(
new Runnable {
override def run(): Unit = {
logger.info("Close invoked on MyBucketingSink on task " + getRuntimeContext.getTaskNameWithSubtasks)
try {
superClose()
} catch {
case e: Throwable => logger.error(e)("Failed to close task " + getRuntimeContext.getTaskNameWithSubtasks)
}
}
},
"Closing task " + getRuntimeContext.getTaskNameWithSubtasks
).start()
}
}

Obviously, if the close hangs, the entire job will hang and Flink will need to be fully restarted.
Please let us know if you see any other problems with this approach.

Kind regards,
    Erik.



Op 27 sep. 2017, om 07:33 heeft wangsan <[hidden email]> het volgende geschreven:

After digging into the source code, we found that when Flink job is canceled, a TaskCanceler thread is created.

The TaskCanceler thread calls cancel() on the invokable and periodically interrupts the
task thread until it has terminated.

try {
  invokable.cancel();
} catch (Throwable t) {
  logger.error("Error while canceling the task {}.", taskName, t);
}//......executer.interrupt();try {
  executer.join(interruptInterval);
}catch (InterruptedException e) {  // we can ignore this}//......

Notice that TaskCanceler first send interrupt signal to task thread, and following with join method. And since the task thread is now try to close DFSOutputStream, which is waiting for ack, thus InterruptedException is throwed out in task thread.

synchronized (dataQueue) {while (!streamerClosed) {
  checkClosed();  if (lastAckedSeqno >= seqno) {    break;
  }  try {
    dataQueue.wait(1000); // when we receive an ack, we notify on
    // dataQueue
  } catch (InterruptedException ie) {    throw new InterruptedIOException(        "Interrupted while waiting for data to be acknowledged by pipeline");
  }
}

I was confused why TaskCanceler call executer.interrupt() before executer.join(interruptInterval). Can anyone help?





Hi,

We are currently using BucketingSink to save data into HDFS in parquet format. But when the flink job was cancelled, we always got Exception in BucketingSink's  close method. The datailed exception info is as below:
[ERROR] [2017-09-26 20:51:58,893] [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
        .......
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)

It seems that DFSOutputStream haven't been closed before task thread is force terminated. We found a similar problem in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html , but setting "akka.ask.timeout" to a larger value does not work for us. So how can we make sure the stream is safely closed when cacelling a job?

Best,
wangsan