Hi,
I have encountered a wired problem. After I start the job for several days, Flink gave me the following error: java.lang.RuntimeException: Unable to find a leader for partitions: [Partition: KafkaTopicPartition{topic='n/a', partition=-1}, KafkaPartitionHandle=[n/a,-1], offset=(not set)] at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495) at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) The Flink job died after this error and tried to restart but in vain at the end. Is there any reason why Flink was unable to find a leader for the partition? A more confusing question would be why is it trying to find topic 'n/a', instead of the topic we have specified? Thanks in advance! Best regards, Mu |
Hi Mu,
which version of flink are you using? I checked the latest branches for 1.2 - 1.5 to look for findLeaderForPartitions at line 205 in Kafka08Fetcher but they did not match. From what I can see in the code, there is a MARKER partition state with topic "n/a" but that is explicitly removed from the list of partitions to find leaders for in the code and solely used during cancelling the fetcher. I don't know whether this is possible, but I suppose there could be more than one marker and we should call removeAll() instead - @Gordon, can you elaborate/check whether this could happen? Nico On 06/03/18 12:51, Mu Kong wrote: > Hi, > > I have encountered a wired problem. > After I start the job for several days, Flink gave me the following error: > > /java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='n/a', partition=-1}, > KafkaPartitionHandle=[n/a,-1], offset=(not set)]/ > / at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)/ > / at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/ > / at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)/ > / at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)/ > / at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)/ > / at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)/ > / at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)/ > / at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)/ > / at java.lang.Thread.run(Thread.java:748)/ > / > / > The Flink job died after this error and tried to restart but in vain at > the end. > > Is there any reason why Flink was unable to find a leader for the partition? > A more confusing question would be why is it trying to find topic 'n/a', > instead of the topic we have specified? > > Thanks in advance! > > Best regards, > Mu signature.asc (201 bytes) Download Attachment |
Hi Nico, Thanks for your prompt response. I'm using Flink 1.3.0 for this job. Please let me know if you need more information. Best regards, Mu On Tue, Mar 6, 2018 at 10:17 PM, Nico Kruber <[hidden email]> wrote: Hi Mu, |
Hi Mu,
You mentioned that the job stopped after the "n/a" topic error, but the job failed to recover. What exception did you encounter in the restart executions? Was it the same error? This would verify if we actually should be removing more than one of these special MARKER partition states. On the other hand, if I recall correctly, the Kafka consumer had a severe bug in 1.3.0 which could lead to potential duplicate data, which was fixed in 1.3.2. Though I don't think it is related to the error you encountered, I strongly recommend that you use 1.3.2 instead. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Gordon, Thanks for your response. I think I've misspoken about the failure after "n/a" exception. The behavior after this exception would be: switched from RUNNING to CANCELING switched from CANCELING to CANCELED Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no longer possible. switched from state FAILING to RESTARTING Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) Recovering checkpoints from ZooKeeper Found 1 checkpoints in ZooKeeper Trying to retrieve checkpoint 1091 Restoring from latest valid checkpoint: Checkpoint 1091 @ xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx switched from CREATED to SCHEDULED switched from SCHEDULED to DEPLOYING switched from DEPLOYING to RUNNING (several check pointings) switched from RUNNING to FAILED TimerException{java.io.EOFException:Premature EOF: no length prefix available} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) Since there several successful check points after the restart, I think the later failure might be something else. Also, could you please share more information about the MARKER in the code? Like which piece of code should I look for. And thanks for the suggestion to let me upgrade the flink to 1.3.2 Best regards, Mu On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <[hidden email]> wrote: Hi Mu, |
I think, I found a code path (race between threads) that may lead to two
markers being in the list. I created https://issues.apache.org/jira/browse/FLINK-8896 to track this and will have a pull request ready (probably) today. Nico On 07/03/18 10:09, Mu Kong wrote: > Hi Gordon, > > Thanks for your response. > I think I've misspoken about the failure after "n/a" exception. > The behavior after this exception would be: > > switched from RUNNING to CANCELING > switched from CANCELING to CANCELED > Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no > longer possible. > switched from state FAILING to RESTARTING > Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) > Recovering checkpoints from ZooKeeper > Found 1 checkpoints in ZooKeeper > Trying to retrieve checkpoint 1091 > Restoring from latest valid checkpoint: Checkpoint 1091 @ > xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx > switched from CREATED to SCHEDULED > switched from SCHEDULED to DEPLOYING > switched from DEPLOYING to RUNNING > (several check pointings) > switched from RUNNING to FAILED > TimerException{java.io.EOFException:Premature EOF: no length prefix > available} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException: Premature EOF: no length prefix available > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) > > Since there several successful check points after the restart, I think > the later failure might be something else. > Also, could you please share more information about the MARKER in the > code? Like which piece of code should I look for. > > And thanks for the suggestion to let me upgrade the flink to 1.3.2 > > Best regards, > Mu > > > On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Mu, > > You mentioned that the job stopped after the "n/a" topic error, but > the job > failed to recover. > What exception did you encounter in the restart executions? Was it > the same > error? > This would verify if we actually should be removing more than one of > these > special MARKER partition states. > > On the other hand, if I recall correctly, the Kafka consumer had a > severe > bug in 1.3.0 which could lead to potential duplicate data, which was > fixed > in 1.3.2. Though I don't think it is related to the error you > encountered, I > strongly recommend that you use 1.3.2 instead. > > Cheers, > Gordon > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> > > signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |