Kafka Producer Partitioning issue

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Producer Partitioning issue

Gyula Fóra
Hi,

I have ran into a strange issue when using the kafka producer.

I got the following exception:
Caused by: java.lang.IllegalArgumentException: Invalid partition given with record: 5 is not in the range [0...2].
	at org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:260)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
	... 19 more

It looks like the partition number returned by the partitioner is out of range, which is pretty much impossible as I am doing a very straightforward mod-hash partitioning logic:

return (int) Math.abs(element.hashCode() % numPartitions);

This topic is supposed to have 16 partitions and it shows it only has 3 and thus the out of bound error for the partitioner. I am not completely sure what happened with the Kafka cluster that could have caused this issue to appear.

This issue also appears if I restore from a savepoint/checkpoint before this happened, it runs for a couple of minutes then crashes. (I am also not completely sure why it doesnt crash immediately)

Has anyone seen something similar before?

Thanks!

Gyula

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer Partitioning issue

rmetzger0
Hi,

Guyla and I had some offline discussion about this issue. We'll report here once we've found the cause.

On Wed, Jul 6, 2016 at 12:01 AM, Gyula Fóra <[hidden email]> wrote:
Hi,

I have ran into a strange issue when using the kafka producer.

I got the following exception:
Caused by: java.lang.IllegalArgumentException: Invalid partition given with record: 5 is not in the range [0...2].
	at org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:260)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
	... 19 more

It looks like the partition number returned by the partitioner is out of range, which is pretty much impossible as I am doing a very straightforward mod-hash partitioning logic:

return (int) Math.abs(element.hashCode() % numPartitions);

This topic is supposed to have 16 partitions and it shows it only has 3 and thus the out of bound error for the partitioner. I am not completely sure what happened with the Kafka cluster that could have caused this issue to appear.

This issue also appears if I restore from a savepoint/checkpoint before this happened, it runs for a couple of minutes then crashes. (I am also not completely sure why it doesnt crash immediately)

Has anyone seen something similar before?

Thanks!

Gyula


Reply | Threaded
Open this post in threaded view
|

Re: Kafka Producer Partitioning issue

Gyula Fóra
Hi,

After some debugging we have found that this was actually a problem with 2 of our Kafka brokers which for some reason held invalid metadata for this specific topic. We are still investigating how this could even happen but the point is that nothing seems to be wrong with the Flink Kafka producer at this point :)

Cheers,
Gyula

Robert Metzger <[hidden email]> ezt írta (időpont: 2016. júl. 8., P, 13:34):
Hi,

Guyla and I had some offline discussion about this issue. We'll report here once we've found the cause.

On Wed, Jul 6, 2016 at 12:01 AM, Gyula Fóra <[hidden email]> wrote:
Hi,

I have ran into a strange issue when using the kafka producer.

I got the following exception:
Caused by: java.lang.IllegalArgumentException: Invalid partition given with record: 5 is not in the range [0...2].
	at org.apache.kafka.clients.producer.internals.Partitioner.partition(Partitioner.java:52)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:333)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:260)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
	... 19 more

It looks like the partition number returned by the partitioner is out of range, which is pretty much impossible as I am doing a very straightforward mod-hash partitioning logic:

return (int) Math.abs(element.hashCode() % numPartitions);

This topic is supposed to have 16 partitions and it shows it only has 3 and thus the out of bound error for the partitioner. I am not completely sure what happened with the Kafka cluster that could have caused this issue to appear.

This issue also appears if I restore from a savepoint/checkpoint before this happened, it runs for a couple of minutes then crashes. (I am also not completely sure why it doesnt crash immediately)

Has anyone seen something similar before?

Thanks!

Gyula