Flink is looking for Kafka topic "n/a"

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink is looking for Kafka topic "n/a"

Mu Kong
Hi,

I have encountered a wired problem.
After I start the job for several days, Flink gave me the following error:

java.lang.RuntimeException: Unable to find a leader for partitions: [Partition: KafkaTopicPartition{topic='n/a', partition=-1}, KafkaPartitionHandle=[n/a,-1], offset=(not set)]
        at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)
        at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

The Flink job died after this error and tried to restart but in vain at the end.

Is there any reason why Flink was unable to find a leader for the partition?
A more confusing question would be why is it trying to find topic 'n/a', instead of the topic we have specified?

Thanks in advance!

Best regards,
Mu
Reply | Threaded
Open this post in threaded view
|

Re: Flink is looking for Kafka topic "n/a"

Nico Kruber
Hi Mu,
which version of flink are you using? I checked the latest branches for
1.2 - 1.5 to look for findLeaderForPartitions at line 205 in
Kafka08Fetcher but they did not match. From what I can see in the code,
there is a MARKER partition state with topic "n/a" but that is
explicitly removed from the list of partitions to find leaders for in
the code and solely used during cancelling the fetcher.

I don't know whether this is possible, but I suppose there could be more
than one marker and we should call removeAll() instead - @Gordon, can
you elaborate/check whether this could happen?


Nico

On 06/03/18 12:51, Mu Kong wrote:

> Hi,
>
> I have encountered a wired problem.
> After I start the job for several days, Flink gave me the following error:
>
> /java.lang.RuntimeException: Unable to find a leader for partitions:
> [Partition: KafkaTopicPartition{topic='n/a', partition=-1},
> KafkaPartitionHandle=[n/a,-1], offset=(not set)]/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/
> /        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)/
> /        at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)/
> /        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)/
> /        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)/
> /        at java.lang.Thread.run(Thread.java:748)/
> /
> /
> The Flink job died after this error and tried to restart but in vain at
> the end.
>
> Is there any reason why Flink was unable to find a leader for the partition?
> A more confusing question would be why is it trying to find topic 'n/a',
> instead of the topic we have specified?
>
> Thanks in advance!
>
> Best regards,
> Mu


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink is looking for Kafka topic "n/a"

Mu Kong
Hi Nico,

Thanks for your prompt response.
I'm using Flink 1.3.0 for this job.

Please let me know if you need more information.


Best regards,
Mu

On Tue, Mar 6, 2018 at 10:17 PM, Nico Kruber <[hidden email]> wrote:
Hi Mu,
which version of flink are you using? I checked the latest branches for
1.2 - 1.5 to look for findLeaderForPartitions at line 205 in
Kafka08Fetcher but they did not match. From what I can see in the code,
there is a MARKER partition state with topic "n/a" but that is
explicitly removed from the list of partitions to find leaders for in
the code and solely used during cancelling the fetcher.

I don't know whether this is possible, but I suppose there could be more
than one marker and we should call removeAll() instead - @Gordon, can
you elaborate/check whether this could happen?


Nico

On 06/03/18 12:51, Mu Kong wrote:
> Hi,
>
> I have encountered a wired problem.
> After I start the job for several days, Flink gave me the following error:
>
> /java.lang.RuntimeException: Unable to find a leader for partitions:
> [Partition: KafkaTopicPartition{topic='n/a', partition=-1},
> KafkaPartitionHandle=[n/a,-1], offset=(not set)]/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:495)/
> /        at
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:205)/
> /        at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)/
> /        at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)/
> /        at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)/
> /        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)/
> /        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)/
> /        at java.lang.Thread.run(Thread.java:748)/
> /
> /
> The Flink job died after this error and tried to restart but in vain at
> the end.
>
> Is there any reason why Flink was unable to find a leader for the partition?
> A more confusing question would be why is it trying to find topic 'n/a',
> instead of the topic we have specified?
>
> Thanks in advance!
>
> Best regards,
> Mu


Reply | Threaded
Open this post in threaded view
|

Re: Flink is looking for Kafka topic "n/a"

Tzu-Li Tai
Hi Mu,

You mentioned that the job stopped after the "n/a" topic error, but the job
failed to recover.
What exception did you encounter in the restart executions? Was it the same
error?
This would verify if we actually should be removing more than one of these
special MARKER partition states.

On the other hand, if I recall correctly, the Kafka consumer had a severe
bug in 1.3.0 which could lead to potential duplicate data, which was fixed
in 1.3.2. Though I don't think it is related to the error you encountered, I
strongly recommend that you use 1.3.2 instead.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink is looking for Kafka topic "n/a"

Mu Kong
Hi Gordon,

Thanks for your response.
I think I've misspoken about the failure after "n/a" exception.
The behavior after this exception would be:

switched from RUNNING to CANCELING
switched from CANCELING to CANCELED
Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no longer possible.
switched from state FAILING to RESTARTING
Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx)
Recovering checkpoints from ZooKeeper
Found 1 checkpoints in ZooKeeper
Trying to retrieve checkpoint 1091
Restoring from latest valid checkpoint: Checkpoint 1091 @ xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx
switched from CREATED to SCHEDULED
switched from SCHEDULED to DEPLOYING
switched from DEPLOYING to RUNNING
(several check pointings)
switched from RUNNING to FAILED
TimerException{java.io.EOFException:Premature EOF: no length prefix available}
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)

Since there several successful check points after the restart, I think the later failure might be something else.
Also, could you please share more information about the MARKER in the code? Like which piece of code should I look for.

And thanks for the suggestion to let me upgrade the flink to 1.3.2

Best regards,
Mu


On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <[hidden email]> wrote:
Hi Mu,

You mentioned that the job stopped after the "n/a" topic error, but the job
failed to recover.
What exception did you encounter in the restart executions? Was it the same
error?
This would verify if we actually should be removing more than one of these
special MARKER partition states.

On the other hand, if I recall correctly, the Kafka consumer had a severe
bug in 1.3.0 which could lead to potential duplicate data, which was fixed
in 1.3.2. Though I don't think it is related to the error you encountered, I
strongly recommend that you use 1.3.2 instead.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink is looking for Kafka topic "n/a"

Nico Kruber
I think, I found a code path (race between threads) that may lead to two
markers being in the list.

I created https://issues.apache.org/jira/browse/FLINK-8896 to track this
and will have a pull request ready (probably) today.


Nico

On 07/03/18 10:09, Mu Kong wrote:

> Hi Gordon,
>
> Thanks for your response.
> I think I've misspoken about the failure after "n/a" exception.
> The behavior after this exception would be:
>
> switched from RUNNING to CANCELING
> switched from CANCELING to CANCELED
> Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no
> longer possible.
> switched from state FAILING to RESTARTING
> Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx)
> Recovering checkpoints from ZooKeeper
> Found 1 checkpoints in ZooKeeper
> Trying to retrieve checkpoint 1091
> Restoring from latest valid checkpoint: Checkpoint 1091 @
> xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx
> switched from CREATED to SCHEDULED
> switched from SCHEDULED to DEPLOYING
> switched from DEPLOYING to RUNNING
> (several check pointings)
> switched from RUNNING to FAILED
> TimerException{java.io.EOFException:Premature EOF: no length prefix
> available}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException: Premature EOF: no length prefix available
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
>
> Since there several successful check points after the restart, I think
> the later failure might be something else.
> Also, could you please share more information about the MARKER in the
> code? Like which piece of code should I look for.
>
> And thanks for the suggestion to let me upgrade the flink to 1.3.2
>
> Best regards,
> Mu
>
>
> On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Mu,
>
>     You mentioned that the job stopped after the "n/a" topic error, but
>     the job
>     failed to recover.
>     What exception did you encounter in the restart executions? Was it
>     the same
>     error?
>     This would verify if we actually should be removing more than one of
>     these
>     special MARKER partition states.
>
>     On the other hand, if I recall correctly, the Kafka consumer had a
>     severe
>     bug in 1.3.0 which could lead to potential duplicate data, which was
>     fixed
>     in 1.3.2. Though I don't think it is related to the error you
>     encountered, I
>     strongly recommend that you use 1.3.2 instead.
>
>     Cheers,
>     Gordon
>
>
>
>     --
>     Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>
>


signature.asc (201 bytes) Download Attachment