Flink Streaming Job Task is getting cancelled silently and causing job to restart

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Hi,

I am running Flink Streaming Job with 1.5.5 version.

- Job is basically reading from Kafka , windowing on 2 minutes , and writing
to hdfs using AvroBucketing Sink .
- Job is running with parallelism 132
- Checkpointing is enabled with interval of 1 minute.
- Savepoint is enabled and getting triggered every 30 min .


Few Modified Properties :

akka.ask.timeout: 15min
akka.client.timeout: 900s
akka.lookup.timeout: 60s
akka.tcp.timeout : 900s

akka.watch.heartbeat.interval: 120s
akka.watch.heartbeat.pause: 900s

Issues :

Job is getting restarted 3 to 4 time every day ( At random times). It simply
says attempting to cancel task. No exception or logging . I tried to set

log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file  

But nothing important is getting logged.

Enabling DEBUGGING at Flink level is making Streaming Application to slow (
so can not do that ).

Attaching Task logs .

task.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/task.gz>  


Thanks
Sohi







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

Stefan Richter-3
Hi,

Could you also provide the job master log?

Best,
Stefan

> On 9. Jan 2019, at 12:02, sohimankotia <[hidden email]> wrote:
>
> Hi,
>
> I am running Flink Streaming Job with 1.5.5 version.
>
> - Job is basically reading from Kafka , windowing on 2 minutes , and writing
> to hdfs using AvroBucketing Sink .
> - Job is running with parallelism 132
> - Checkpointing is enabled with interval of 1 minute.
> - Savepoint is enabled and getting triggered every 30 min .
>
>
> Few Modified Properties :
>
> akka.ask.timeout: 15min
> akka.client.timeout: 900s
> akka.lookup.timeout: 60s
> akka.tcp.timeout : 900s
>
> akka.watch.heartbeat.interval: 120s
> akka.watch.heartbeat.pause: 900s
>
> Issues :
>
> Job is getting restarted 3 to 4 time every day ( At random times). It simply
> says attempting to cancel task. No exception or logging . I tried to set
>
> log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file  
>
> But nothing important is getting logged.
>
> Enabling DEBUGGING at Flink level is making Streaming Application to slow (
> so can not do that ).
>
> Attaching Task logs .
>
> task.gz
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/task.gz>  
>
>
> Thanks
> Sohi
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Hi ,

Any Update/help  please ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

Andrey Zagrebin-2
Hi Sohi,

This still looks like Task Manager logs, could you post Job Master logs, please?

Best,
Andrey

On Tue, Jan 15, 2019 at 7:49 AM sohimankotia <[hidden email]> wrote:
Hi ,

Any Update/help  please ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Hi Andrey ,

Pls find logs . Attaching dropbox link as logs as large .


Job Manager . : https://www.dropbox.com/s/q0rd60coydupl6w/full.log.gz?dl=0
Application :
https://www.dropbox.com/s/cn3yrd273wd99f2/jm-sohan.log.gz?dl=0


Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

Andrey Zagrebin-2
Hi Sohi,

Something was originally interrupted in DFSOutputStream$DataStreamer.run.
It was thrown in the timer callback which processed files in CustomBucketingSink.
Task reported the failure to JM and JM triggered then job cancelation.

I do not see this CustomBucketingSink in Flink code. Is it one of your application classes?
Did it override BucketingSink.onProcessingTime?

2019-01-10 18:22:45,295 WARN  org.apache.hadoop.hdfs.DFSClient                              - Slow ReadProcessor read fields took 128378ms (threshold=30000ms); ack: seqno: 10 reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 457753 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[192.168.3.180:50010,DS-92b67356-e83f-410e-aeb4-e1f58b6cc69a,DISK], DatanodeInfoWithStorage[192.168.3.185:50010,DS-0dcac37b-4832-4b4e-b167-70762a3c6f34,DISK]]
2019-01-10 18:22:45,300 DEBUG org.apache.flink.streaming.connectors.fs.CustomBucketingSink  - Moving in-progress bucket hdfs:/new_data_pipeline/prod/phase1/aggregated-data/item_agg/20190110/18/20/_part-1547124600014-9-0.in-progress to pending file hdfs:/new_data_pipeline/prod/phase1/aggregated-data/item_agg/20190110/18/20/_part-1547124600014-9-0.pending
2019-01-10 18:22:45,309 WARN  org.apache.hadoop.hdfs.DFSClient                              - DataStreamer Exception
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:478)
at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:618)
2019-01-10 18:22:45,319 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally item_agg-avro -> Sink: item_agg (2/20) (3b85714e145ca9f6760757c6fb2203bb).
2019-01-10 18:22:45,319 INFO  org.apache.flink.runtime.taskmanager.Task                     - item_agg-avro -> Sink: item_agg (2/20) (3b85714e145ca9f6760757c6fb2203bb) switched from RUNNING to FAILED.
TimerException{java.nio.channels.ClosedByInterruptException}
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:478)
at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:618)
2019-01-10 18:22:45,483 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code item_agg-avro -> Sink: item_agg (2/20) (3b85714e145ca9f6760757c6fb2203bb).

I will also cc Kostas and Aljoscha, maybe, they could help.

Best,
Andrey

On Wed, Jan 16, 2019 at 1:37 PM sohimankotia <[hidden email]> wrote:
Hi Andrey ,

Pls find logs . Attaching dropbox link as logs as large .


Job Manager . : https://www.dropbox.com/s/q0rd60coydupl6w/full.log.gz?dl=0
Application :
https://www.dropbox.com/s/cn3yrd273wd99f2/jm-sohan.log.gz?dl=0


Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
This post was updated on .
Hi Andrey,

Yes. CustomBucketingSink is custom class copied from Bucketing Sink itself .

Few changes were added :

1. Add timestamp in part files
2. Few Logging statements

Note: Looks like I copied it from version 1.4 ( Don't know if that could be
the reason for failure)

Did it override BucketingSink.onProcessingTime? - It is same as present in
original BucketingSink .

Attaching Java File .Also added comment :  " // Added by Sohi" . You can
search and see the changes there I have made .

File : CustomBucketingSink.java

CustomBucketingSink.java



Thanks
Sohi




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Hi ,

Yes issue with Bucketing Sink . I removed and replaced Sink with Kafka Sink
it worked fine .

What could be causing

TimerException{java.nio.channels.ClosedByInterruptException}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)


Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Hi Team,

Any help/update on this ?

This is still an issue where i am using bucketing sink in production.



Thanks
Sohi




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

Erik van Oosten
Hi sohimankotia,

My advise from also having to sub-class BucketingSink:

* rebase your changes on the BucketingSink that comes with the Flink
version you are using
* use the same super completely ugly hack I had to deploy as described
here:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-BucketingSink-when-cancelling-Flink-job-td15856.html#a16168
* consider using the successor of BucketingSink: StreamingFileSink

Good luck,
     Erik.


Op 27-01-19 om 10:13 schreef sohimankotia:

> Hi Team,
>
> Any help/update on this ?
>
> This is still an issue where i am using bucketing sink in production.
>
>
>
> Thanks
> Sohi
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Hi Erik,

Are your suggesting all options together ?

Which of version of flink has this solved ? I am currently using 1.5.5 .

-Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

Andrey Zagrebin-3
Hi Sohi,

I would also recommend trying the newer StreamingFileSink which is available in Flink 1.7.x [1].

Best,

On Sun, Feb 24, 2019 at 4:14 AM sohimankotia <[hidden email]> wrote:
Hi Erik,

Are your suggesting all options together ?

Which of version of flink has this solved ? I am currently using 1.5.5 .

-Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
Thanks Andrey .

Yeah will upgrade and see if same gets reproduced .


-Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
In reply to this post by Andrey Zagrebin-3
Hi Andrey,

I am using AvroSinkWriter (with Bucketing Sink) with compression enabled .
Looks like StreamingFileSink does not have direct support for
AvroSinkWriter.  Sequence File Format is there for StreamingFileSink , but
looks like it roll files on every checkpoint (OnCheckpointRollingPolicy)
which can create lots of small files as we have checkpointing enabled every
2 minutes .



If following issue is still there for 1.7.2 version:

(d9d0f719652f87dfb2ec663b46ef2e47) switched from RUNNING to FAILED.
TimerException{java.nio.channels.ClosedByInterruptException}
        at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.ClosedByInterruptException
        at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:478)
        at
org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
        at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
        at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:618)


Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

sohimankotia
In reply to this post by Erik van Oosten
Hi Erik,

I am still not able to understand reason behind this exception.

Is this exception causing failure and restart of job ? or This is occurring
after failure/restart is triggered .


Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

Andrey Zagrebin-3
Hi Sohi,

There seems to be no avro implementations of Encoder interface used in StreamingFileSink but maybe it could be implemented based on AvroKeyValueWriter with not such a big effort.

There is also a DefaultRollingPolicy which is based on time and number of records. It might create a temporary files for unfinished results per checkpoint but not in the final result.

Best,
Andrey

On Mon, Feb 25, 2019 at 1:33 PM sohimankotia <[hidden email]> wrote:
Hi Erik,

I am still not able to understand reason behind this exception.

Is this exception causing failure and restart of job ? or This is occurring
after failure/restart is triggered .


Thanks
Sohi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/