FlinkKafkaConsumer and multiple topics

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaConsumer and multiple topics

Jakob Ericsson
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

rmetzger0
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

Jakob Ericsson
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob


Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

Jakob Ericsson
Hit another problem. It is probably related to a topic that still exist in zk but is not used anymore (therefore no partitions) or I want to start a listener for a topic that hasn't yet been created. I would like it not to crash.

Also, some funny Scala <-> Java 

Exception in thread "main" java.lang.NoSuchMethodError: kafka.common.ErrorMapping.InvalidTopicCode()S
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)

On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <[hidden email]> wrote:
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob



Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

rmetzger0
Hi,

did you manually add a Kafka dependency into your project? Maybe you are overwriting the Kafka version to a lower version?

I'm sorry that our consumer is crashing when its supposed to read an invalid topic .. but In general, thats a good behavior ;)

Maybe you can check whether the topic exists from your user code? The getPartitionsForTopic() method is actually a public static method that you can call.
If its throwing an exception, the topic doesn't exist anymore.


Robert

On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <[hidden email]> wrote:
Hit another problem. It is probably related to a topic that still exist in zk but is not used anymore (therefore no partitions) or I want to start a listener for a topic that hasn't yet been created. I would like it not to crash.

Also, some funny Scala <-> Java 

Exception in thread "main" java.lang.NoSuchMethodError: kafka.common.ErrorMapping.InvalidTopicCode()S
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)

On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <[hidden email]> wrote:
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob




Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

Jakob Ericsson
We did some rebalance of topics in our Kafka cluster today. I had a flink job running and it crashed when some of the partitions were moved, other consumers (non flink) continued to work.

Should I configure it differently or could this be a bug? 

09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
java.lang.Exception: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)


On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <[hidden email]> wrote:
Hi,

did you manually add a Kafka dependency into your project? Maybe you are overwriting the Kafka version to a lower version?

I'm sorry that our consumer is crashing when its supposed to read an invalid topic .. but In general, thats a good behavior ;)

Maybe you can check whether the topic exists from your user code? The getPartitionsForTopic() method is actually a public static method that you can call.
If its throwing an exception, the topic doesn't exist anymore.


Robert

On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <[hidden email]> wrote:
Hit another problem. It is probably related to a topic that still exist in zk but is not used anymore (therefore no partitions) or I want to start a listener for a topic that hasn't yet been created. I would like it not to crash.

Also, some funny Scala <-> Java 

Exception in thread "main" java.lang.NoSuchMethodError: kafka.common.ErrorMapping.InvalidTopicCode()S
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)

On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <[hidden email]> wrote:
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob





Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

rmetzger0
Hi Jakob,

what do you exactly mean by rebalance of topics? Did the leader of the partitions change?
Were topics deleted?

Flink's KafkaConsumer does not try to recover from these exceptions. We rely on Flink's fault tolerance mechanisms to restart the data consumption (from the last valid offset).
Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?


On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <[hidden email]> wrote:
We did some rebalance of topics in our Kafka cluster today. I had a flink job running and it crashed when some of the partitions were moved, other consumers (non flink) continued to work.

Should I configure it differently or could this be a bug? 

09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
java.lang.Exception: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)


On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <[hidden email]> wrote:
Hi,

did you manually add a Kafka dependency into your project? Maybe you are overwriting the Kafka version to a lower version?

I'm sorry that our consumer is crashing when its supposed to read an invalid topic .. but In general, thats a good behavior ;)

Maybe you can check whether the topic exists from your user code? The getPartitionsForTopic() method is actually a public static method that you can call.
If its throwing an exception, the topic doesn't exist anymore.


Robert

On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <[hidden email]> wrote:
Hit another problem. It is probably related to a topic that still exist in zk but is not used anymore (therefore no partitions) or I want to start a listener for a topic that hasn't yet been created. I would like it not to crash.

Also, some funny Scala <-> Java 

Exception in thread "main" java.lang.NoSuchMethodError: kafka.common.ErrorMapping.InvalidTopicCode()S
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)

On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <[hidden email]> wrote:
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob






Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

Jakob Ericsson
What I actually meant was partition reassignment (https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool).
No topics were deleted. 
We added a bunch of new servers and needed to reassign some partitions to spread the load.

No, I haven't set the setNumberOfExecutionRetries().

On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

what do you exactly mean by rebalance of topics? Did the leader of the partitions change?
Were topics deleted?

Flink's KafkaConsumer does not try to recover from these exceptions. We rely on Flink's fault tolerance mechanisms to restart the data consumption (from the last valid offset).
Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?


On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <[hidden email]> wrote:
We did some rebalance of topics in our Kafka cluster today. I had a flink job running and it crashed when some of the partitions were moved, other consumers (non flink) continued to work.

Should I configure it differently or could this be a bug? 

09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
java.lang.Exception: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)


On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <[hidden email]> wrote:
Hi,

did you manually add a Kafka dependency into your project? Maybe you are overwriting the Kafka version to a lower version?

I'm sorry that our consumer is crashing when its supposed to read an invalid topic .. but In general, thats a good behavior ;)

Maybe you can check whether the topic exists from your user code? The getPartitionsForTopic() method is actually a public static method that you can call.
If its throwing an exception, the topic doesn't exist anymore.


Robert

On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <[hidden email]> wrote:
Hit another problem. It is probably related to a topic that still exist in zk but is not used anymore (therefore no partitions) or I want to start a listener for a topic that hasn't yet been created. I would like it not to crash.

Also, some funny Scala <-> Java 

Exception in thread "main" java.lang.NoSuchMethodError: kafka.common.ErrorMapping.InvalidTopicCode()S
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)

On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <[hidden email]> wrote:
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob







Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer and multiple topics

Stephan Ewen
The new KafkaConsumer fro Kafka 0.9 should be able to handle this, as the Kafka Client Code itself has support for this then.

For 0.8.x, we would need to implement support for recovery inside the consumer ourselves, which is why we decided to initially let the Job Recovery take care of that.
If that becomes much of an issue, we can look into this again...

On Thu, Sep 24, 2015 at 10:46 PM, Jakob Ericsson <[hidden email]> wrote:
What I actually meant was partition reassignment (https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool).
No topics were deleted. 
We added a bunch of new servers and needed to reassign some partitions to spread the load.

No, I haven't set the setNumberOfExecutionRetries().

On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

what do you exactly mean by rebalance of topics? Did the leader of the partitions change?
Were topics deleted?

Flink's KafkaConsumer does not try to recover from these exceptions. We rely on Flink's fault tolerance mechanisms to restart the data consumption (from the last valid offset).
Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?


On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson <[hidden email]> wrote:
We did some rebalance of topics in our Kafka cluster today. I had a flink job running and it crashed when some of the partitions were moved, other consumers (non flink) continued to work.

Should I configure it differently or could this be a bug? 

09/24/2015 15:34:31     Source: Custom Source(3/4) switched to FAILED
java.lang.Exception: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)

        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)


On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger <[hidden email]> wrote:
Hi,

did you manually add a Kafka dependency into your project? Maybe you are overwriting the Kafka version to a lower version?

I'm sorry that our consumer is crashing when its supposed to read an invalid topic .. but In general, thats a good behavior ;)

Maybe you can check whether the topic exists from your user code? The getPartitionsForTopic() method is actually a public static method that you can call.
If its throwing an exception, the topic doesn't exist anymore.


Robert

On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <[hidden email]> wrote:
Hit another problem. It is probably related to a topic that still exist in zk but is not used anymore (therefore no partitions) or I want to start a listener for a topic that hasn't yet been created. I would like it not to crash.

Also, some funny Scala <-> Java 

Exception in thread "main" java.lang.NoSuchMethodError: kafka.common.ErrorMapping.InvalidTopicCode()S
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:619)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer081.<init>(FlinkKafkaConsumer081.java:55)

On Fri, Sep 18, 2015 at 11:02 AM, Jakob Ericsson <[hidden email]> wrote:
That will work. We have some utility classes for exposing the ZK-info.

On Fri, Sep 18, 2015 at 10:50 AM, Robert Metzger <[hidden email]> wrote:
Hi Jakob,

currently, its not possible to subscribe to multiple topics with one FlinkKafkaConsumer.

So for now, you have to create a FKC for each topic .. so you'll end up with 50 sources.

As soon as Kafka releases the new consumer, it will support subscribing to multiple topics (I think even with pattern support) and we can easily expose the APIs also to the FlinkKafkaConsumer. 
As you can see here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan Kafka has plans to release the new consumer API in October.
As soon as the new API is out, we'll support it.

I hope this solution is okay for you. If not, please let me know ;)


Robert

On Fri, Sep 18, 2015 at 10:43 AM, Jakob Ericsson <[hidden email]> wrote:
Hi,

Would it be possible to get the FlinkKafkaConsumer to support multiple topics, like a list?

Or would it be better to instantiate one FlinkKafkaConsumers per topic and add as a source? 
We have about 40-50 topics to listen for one job.
Or even better, supply a regexp pattern that defines the queues, this means that you have to do some queries against ZK to get information about topics.

Kind regards,
Jakob