回复:Changing timeout for cancel command

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:Changing timeout for cancel command

Zhijiang(wangzhijiang999)
Hi Jürgen,

     You can set the timeout in the configuration by this key "akka.ask.timeout", and the current default value is 10 s. Hope it can help you.


cheers,
zhijiang


------------------------------------------------------------------
发件人:Jürgen Thomann <[hidden email]>
发送时间:2017年4月12日(星期三) 19:04
收件人:user <[hidden email]>
主 题:Changing timeout for cancel command

Hi,

We currently get the following exception if we cancel a job which writes 
to Hadoop:
ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  
- Error while trying to hflushOrSync! java.io.InterruptedIOException: 
Interrupted while waiting for data to be acknowledged by pipeline

This causes problem if we cancel a job with creating a savepoint and 
resubmitting the job because the file is sometimes at the end smaller 
than the file size specified in the valid-length file.

Is there a way to increase the time out during cancel to give the flush 
a bit more time? We currently lose events if this happens.

Best,
Jürgen

Reply | Threaded
Open this post in threaded view
|

Re: 回复:Changing timeout for cancel command

Jürgen Thomann

Hi zhijiang,

I checked this value and I haven't configured it so I think it should be the default 10s. I checked how long the flink cancel command took with the time command and it was finished after 6 seconds.

After filtering out the messages of one Sink, it looks like it interrupts it in the area of milliseconds. Here are the logs from one taskmanager (standalone cluster).

06:53:16,484 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,484 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed) switched from RUNNING to CANCELING.
06:53:16,484 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,503 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
06:53:16,503 ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - Error while trying to hflushOrSync! java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
        at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
        at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038)
        at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
        at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.streaming.connectors.fs.StreamWriterBase.hflushOrSync(StreamWriterBase.java:72)
        at org.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:131)
        at org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:146)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
        at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
06:53:16,504 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Sink: /tmp/flink/events_invalid HDFS sink (2/2)
06:53:16,504 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: /tmp/flink/events_invalid HDFS sink (477db6e41932ad9b60c72e14de4488ed)

Best,
Jürgen

On 13.04.2017 07:05, Zhijiang(wangzhijiang999) wrote:
Hi Jürgen,

     You can set the timeout in the configuration by this key "akka.ask.timeout", and the current default value is 10 s. Hope it can help you.


cheers,
zhijiang


------------------------------------------------------------------
发件人:Jürgen Thomann [hidden email]
发送时间:2017年4月12日(星期三) 19:04
收件人:user [hidden email]
主 题:Changing timeout for cancel command

Hi,

We currently get the following exception if we cancel a job which writes 
to Hadoop:
ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  
- Error while trying to hflushOrSync! java.io.InterruptedIOException: 
Interrupted while waiting for data to be acknowledged by pipeline

This causes problem if we cancel a job with creating a savepoint and 
resubmitting the job because the file is sometimes at the end smaller 
than the file size specified in the valid-length file.

Is there a way to increase the time out during cancel to give the flush 
a bit more time? We currently lose events if this happens.

Best,
Jürgen

Reply | Threaded
Open this post in threaded view
|

回复:回复:Changing timeout for cancel command

Zhijiang(wangzhijiang999)
In reply to this post by Zhijiang(wangzhijiang999)
Hi Jürgen, 

    I got your point from the log, but i think it can not do anything from flink side. The task receives the cancel command from master, and it will dipose the operator after task thread is interupted.

Maybe you can check if there are some parameters to set for waiting longer for ack in hdfs. 


cheers,
zhijiang
------------------------------------------------------------------
发件人:Jürgen Thomann <[hidden email]>
发送时间:2017年4月13日(星期四) 15:32
收件人:user <[hidden email]>
主 题:Re: 回复:Changing timeout for cancel command

Hi zhijiang,

I checked this value and I haven't configured it so I think it should be the default 10s. I checked how long the flink cancel command took with the time command and it was finished after 6 seconds.

After filtering out the messages of one Sink, it looks like it interrupts it in the area of milliseconds. Here are the logs from one taskmanager (standalone cluster).

06:53:16,484 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,484 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed) switched from RUNNING to CANCELING.
06:53:16,484 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Sink: /tmp/flink/events_invalid HDFS sink (2/2) (477db6e41932ad9b60c72e14de4488ed).
06:53:16,503 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
06:53:16,503 ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - Error while trying to hflushOrSync! java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
        atorg.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
        atorg.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:2038)
        at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
        atorg.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
        at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
        atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.hflushOrSync(StreamWriterBase.java:72)
        atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.flush(StreamWriterBase.java:131)
        atorg.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:146)
        atorg.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:554)
        atorg.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:423)
        atorg.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        atorg.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
        atorg.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
        atorg.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
06:53:16,504 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Sink: /tmp/flink/events_invalid HDFS sink (2/2)
06:53:16,504 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: /tmp/flink/events_invalid HDFS sink (477db6e41932ad9b60c72e14de4488ed)

Best,
Jürgen

On 13.04.2017 07:05, Zhijiang(wangzhijiang999) wrote:
Hi Jürgen,

     You can set the timeout in the configuration by this key "akka.ask.timeout", and the current default value is 10 s. Hope it can help you.


cheers,
zhijiang


------------------------------------------------------------------
发件人:Jürgen Thomann [hidden email]
发送时间:2017年4月12日(星期三) 19:04
收件人:user [hidden email]
主 题:Changing timeout for cancel command

Hi,

We currently get the following exception if we cancel a job which writes 
to Hadoop:
ERROR org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  
- Error while trying to hflushOrSync! java.io.InterruptedIOException: 
Interrupted while waiting for data to be acknowledged by pipeline

This causes problem if we cancel a job with creating a savepoint and 
resubmitting the job because the file is sometimes at the end smaller 
than the file size specified in the valid-length file.

Is there a way to increase the time out during cancel to give the flush 
a bit more time? We currently lose events if this happens.

Best,
Jürgen