Hi,
I am running Flink Streaming Job with 1.5.5 version. - Job is basically reading from Kafka , windowing on 2 minutes , and writing to hdfs using AvroBucketing Sink . - Job is running with parallelism 132 - Checkpointing is enabled with interval of 1 minute. - Savepoint is enabled and getting triggered every 30 min . Few Modified Properties : akka.ask.timeout: 15min akka.client.timeout: 900s akka.lookup.timeout: 60s akka.tcp.timeout : 900s akka.watch.heartbeat.interval: 120s akka.watch.heartbeat.pause: 900s Issues : Job is getting restarted 3 to 4 time every day ( At random times). It simply says attempting to cancel task. No exception or logging . I tried to set log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file But nothing important is getting logged. Enabling DEBUGGING at Flink level is making Streaming Application to slow ( so can not do that ). Attaching Task logs . task.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/task.gz> Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Could you also provide the job master log? Best, Stefan > On 9. Jan 2019, at 12:02, sohimankotia <[hidden email]> wrote: > > Hi, > > I am running Flink Streaming Job with 1.5.5 version. > > - Job is basically reading from Kafka , windowing on 2 minutes , and writing > to hdfs using AvroBucketing Sink . > - Job is running with parallelism 132 > - Checkpointing is enabled with interval of 1 minute. > - Savepoint is enabled and getting triggered every 30 min . > > > Few Modified Properties : > > akka.ask.timeout: 15min > akka.client.timeout: 900s > akka.lookup.timeout: 60s > akka.tcp.timeout : 900s > > akka.watch.heartbeat.interval: 120s > akka.watch.heartbeat.pause: 900s > > Issues : > > Job is getting restarted 3 to 4 time every day ( At random times). It simply > says attempting to cancel task. No exception or logging . I tried to set > > log4j.logger.org.apache.flink.runtime.taskmanager.Task=DEBUG,file > > But nothing important is getting logged. > > Enabling DEBUGGING at Flink level is making Streaming Application to slow ( > so can not do that ). > > Attaching Task logs . > > task.gz > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/task.gz> > > > Thanks > Sohi > > > > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Stefan,
Attaching Logs : You can search for : "2019-01-09 19:34:44,170 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: " in first 2 log files. f3-part-aa.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/f3-part-aa.gz> f3-part-ab.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/f3-part-ab.gz> f3-part-ac.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/f3-part-ac.gz> f3-part-ad.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/f3-part-ad.gz> f3-part-ae.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/f3-part-ae.gz> f3-part-af.gz <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t894/f3-part-af.gz> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi ,
Any Update/help please ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sohi, This still looks like Task Manager logs, could you post Job Master logs, please? Best, Andrey On Tue, Jan 15, 2019 at 7:49 AM sohimankotia <[hidden email]> wrote: Hi , |
Hi Andrey ,
Pls find logs . Attaching dropbox link as logs as large . Job Manager . : https://www.dropbox.com/s/q0rd60coydupl6w/full.log.gz?dl=0 Application : https://www.dropbox.com/s/cn3yrd273wd99f2/jm-sohan.log.gz?dl=0 Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sohi, Something was originally interrupted in DFSOutputStream$DataStreamer.run. It was thrown in the timer callback which processed files in CustomBucketingSink. Task reported the failure to JM and JM triggered then job cancelation. I do not see this CustomBucketingSink in Flink code. Is it one of your application classes? Did it override BucketingSink.onProcessingTime? 2019-01-10 18:22:45,295 WARN org.apache.hadoop.hdfs.DFSClient - Slow ReadProcessor read fields took 128378ms (threshold=30000ms); ack: seqno: 10 reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 457753 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[192.168.3.180:50010,DS-92b67356-e83f-410e-aeb4-e1f58b6cc69a,DISK], DatanodeInfoWithStorage[192.168.3.185:50010,DS-0dcac37b-4832-4b4e-b167-70762a3c6f34,DISK]] 2019-01-10 18:22:45,300 DEBUG org.apache.flink.streaming.connectors.fs.CustomBucketingSink - Moving in-progress bucket hdfs:/new_data_pipeline/prod/phase1/aggregated-data/item_agg/20190110/18/20/_part-1547124600014-9-0.in-progress to pending file hdfs:/new_data_pipeline/prod/phase1/aggregated-data/item_agg/20190110/18/20/_part-1547124600014-9-0.pending 2019-01-10 18:22:45,309 WARN org.apache.hadoop.hdfs.DFSClient - DataStreamer Exception java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:478) at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:618) 2019-01-10 18:22:45,319 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally item_agg-avro -> Sink: item_agg (2/20) (3b85714e145ca9f6760757c6fb2203bb). 2019-01-10 18:22:45,319 INFO org.apache.flink.runtime.taskmanager.Task - item_agg-avro -> Sink: item_agg (2/20) (3b85714e145ca9f6760757c6fb2203bb) switched from RUNNING to FAILED. TimerException{java.nio.channels.ClosedByInterruptException} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:478) at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:618) 2019-01-10 18:22:45,483 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code item_agg-avro -> Sink: item_agg (2/20) (3b85714e145ca9f6760757c6fb2203bb). I will also cc Kostas and Aljoscha, maybe, they could help. Best, Andrey On Wed, Jan 16, 2019 at 1:37 PM sohimankotia <[hidden email]> wrote: Hi Andrey , |
This post was updated on .
Hi Andrey,
Yes. CustomBucketingSink is custom class copied from Bucketing Sink itself . Few changes were added : 1. Add timestamp in part files 2. Few Logging statements Note: Looks like I copied it from version 1.4 ( Don't know if that could be the reason for failure) Did it override BucketingSink.onProcessingTime? - It is same as present in original BucketingSink . Attaching Java File .Also added comment : " // Added by Sohi" . You can search and see the changes there I have made . File : CustomBucketingSink.java CustomBucketingSink.java Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi ,
Yes issue with Bucketing Sink . I removed and replaced Sink with Kafka Sink it worked fine . What could be causing TimerException{java.nio.channels.ClosedByInterruptException} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Team,
Any help/update on this ? This is still an issue where i am using bucketing sink in production. Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi sohimankotia,
My advise from also having to sub-class BucketingSink: * rebase your changes on the BucketingSink that comes with the Flink version you are using * use the same super completely ugly hack I had to deploy as described here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Exception-in-BucketingSink-when-cancelling-Flink-job-td15856.html#a16168 * consider using the successor of BucketingSink: StreamingFileSink Good luck, Erik. Op 27-01-19 om 10:13 schreef sohimankotia: > Hi Team, > > Any help/update on this ? > > This is still an issue where i am using bucketing sink in production. > > > > Thanks > Sohi > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Erik,
Are your suggesting all options together ? Which of version of flink has this solved ? I am currently using 1.5.5 . -Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sohi, I would also recommend trying the newer StreamingFileSink which is available in Flink 1.7.x [1]. Best, On Sun, Feb 24, 2019 at 4:14 AM sohimankotia <[hidden email]> wrote: Hi Erik, |
Thanks Andrey .
Yeah will upgrade and see if same gets reproduced . -Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Andrey Zagrebin-3
Hi Andrey,
I am using AvroSinkWriter (with Bucketing Sink) with compression enabled . Looks like StreamingFileSink does not have direct support for AvroSinkWriter. Sequence File Format is there for StreamingFileSink , but looks like it roll files on every checkpoint (OnCheckpointRollingPolicy) which can create lots of small files as we have checkpointing enabled every 2 minutes . If following issue is still there for 1.7.2 version: (d9d0f719652f87dfb2ec663b46ef2e47) switched from RUNNING to FAILED. TimerException{java.nio.channels.ClosedByInterruptException} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:478) at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:618) Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
In reply to this post by Erik van Oosten
Hi Erik,
I am still not able to understand reason behind this exception. Is this exception causing failure and restart of job ? or This is occurring after failure/restart is triggered . Thanks Sohi -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Sohi, There seems to be no avro implementations of Encoder interface used in StreamingFileSink but maybe it could be implemented based on AvroKeyValueWriter with not such a big effort. There is also a DefaultRollingPolicy which is based on time and number of records. It might create a temporary files for unfinished results per checkpoint but not in the final result. Best, Andrey On Mon, Feb 25, 2019 at 1:33 PM sohimankotia <[hidden email]> wrote: Hi Erik, |
Free forum by Nabble | Edit this page |