Hi Jürgen,
I got your point from the log, but i think it can not do anything from flink side. The task receives the cancel command from master, and it will dipose the operator after task thread is interupted.
Maybe you can check if there are some parameters to set for waiting longer for ack in hdfs.
cheers,
zhijiang
------------------------------------------------------------------
发送时间:2017年4月13日(星期四) 15:32
主 题:Re: 回复:Changing timeout for cancel command
Hi zhijiang,
I checked this value and I haven't configured it so I think it should be the default 10s. I checked how long the flink cancel command took with the time command and it was finished after 6 seconds.
After filtering out the messages of one Sink, it looks like it interrupts it in the area of milliseconds. Here are the logs from one taskmanager (standalone cluster).
06:53:16,484 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,484 INFO org.apache.flink.runtime.taskmanager.Task - Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed) switched from RUNNING to CANCELING.
06:53:16,484 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,503 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator.
06:53:16,503 ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - Error while trying to hflushOrSync! java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
atorg.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
atorg.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
atorg.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.hflushOrSync(StreamWriterBase.java:72)
atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:131)
atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:146)
atorg.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
atorg.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423)
atorg.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
atorg.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
atorg.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
atorg.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
06:53:16,504 INFO org.apache.flink.core.fs.FileSystem - Ensuring all FileSystem streams are closed for Sink: /tmp/flink/events_invalid HDFS sink (2/2)
06:53:16,504 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: /tmp/flink/events_invalid HDFS sink (477db6e41932ad9b60c72e14de4488ed)
Best,
Jürgen
On 13.04.2017 07:05, Zhijiang(wangzhijiang999) wrote:
Hi Jürgen,
You can set the timeout in the configuration by this key "akka.ask.timeout", and the current default value is 10 s. Hope it can help you.
cheers,
zhijiang
------------------------------------------------------------------
发送时间:2017年4月12日(星期三) 19:04
主 题:Changing timeout for cancel command
Hi,
We currently get the following exception if we cancel a job which writes
to Hadoop:
ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
- Error while trying to hflushOrSync! java.io.InterruptedIOException:
Interrupted while waiting for data to be acknowledged by pipeline
This causes problem if we cancel a job with creating a savepoint and
resubmitting the job because the file is sometimes at the end smaller
than the file size specified in the valid-length file.
Is there a way to increase the time out during cancel to give the flush
a bit more time? We currently lose events if this happens.
Best,
Jürgen