FlinkKafkaConsumer bootstrap.servers vs. broker hosts

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaConsumer bootstrap.servers vs. broker hosts

Juho Autio
The FlinkKafkaConsumer takes the two arguments --bootstrap.servers (kafka servers) and --zookeeper.connect (zookeeper servers). Then it seems to resolve Kafka brokers from zookeeper, and uses those host names to consume kafka. But it also connects to the given bootstrap servers to fetch some metadata, it seems.

This is problematic when zookeeper uses internal kafka hostnames – consuming won't work when those resolved kafka hosts can't be reached.

Could the consumer be changed to
- respect the provided kafka hosts / ips and not use host names resolved from zookeeper
and optionally
- not require bootstrap.servers argument at all, just resolve broker host names from zookeeper and use those as "bootstrap servers", too?

Is the concept of bootstrap server something entirely else, or what am I missing here?

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer bootstrap.servers vs. broker hosts

rmetzger0
Hi Juho,

sorry for the late reply, I was busy with Flink Forward :)

The Flink Kafka Consumer needs both addresses. 
Kafka uses the bootstrap servers to connect to the brokers to consume messages.
The Zookeeper connection is used to commit the offsets of the consumer group once a state snapshot in Flink has been completed.

Our consumers are pretty minimalistic because we are waiting for the Kafka project to finish their new consumer API [2]. It seems that its a matter of weeks for the Kafka project to release the new consumer [1].
With the new consumer API from kafka, we can actually give Flink users a lot more features, for example subscribing to multiple topics with one source instance, committing offsets to brokers, etc.
I think the new consumer will not need both bootstrap servers and zookeeper.


Is the problem you're reporting preventing you from using the KafkaConsumer in production?
If so, can you tell me when exactly the data consumption is failing? (maybe with some logs / stacktrace?)

If this contains confidential information, you can also send me a private mail.


Regards,
Robert






On Tue, Oct 13, 2015 at 3:29 PM, Juho Autio <[hidden email]> wrote:
The FlinkKafkaConsumer takes the two arguments --bootstrap.servers (kafka
servers) and --zookeeper.connect (zookeeper servers). Then it seems to
resolve Kafka brokers from zookeeper, and uses those host names to consume
kafka. But it also connects to the given bootstrap servers to fetch some
metadata, it seems.

This is problematic when zookeeper uses internal kafka hostnames – consuming
won't work when those resolved kafka hosts can't be reached.

Could the consumer be changed to
- respect the provided kafka hosts / ips and not use host names resolved
from zookeeper
and optionally
- not require bootstrap.servers argument at all, just resolve broker host
names from zookeeper and use those as "bootstrap servers", too?

Is the concept of bootstrap server something entirely else, or what am I
missing here?

Thanks!



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer-bootstrap-servers-vs-broker-hosts-tp3109.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer bootstrap.servers vs. broker hosts

Juho Autio
Hi,

Don't worry – this is quite a low priority question. Definitely not a production issue and as a work around it can be fixed rather easily with suitable network setup. Probably quite rare, too, that this kind of network scenario happens with anyone.

But I think that it might be possible to improve how FlinkKafkaConsumer handles the host names. I'll leave it for you to consider :)

You wrote:
"Kafka uses the bootstrap servers to connect to the brokers to consume messages."
- but it seems that it's actually using the kafka broker hosts that zookeeper tells to it?

Let me clarify first the particular network setup where this becomes a problem:

- zookeeper & kafka hosts are within the same internal network
- client with FlinkKafkaConsumer is outside that network
- zookeeper & kafka hosts available to the external client via public ip:s. for example one of the kafka nodes: 10.0.3.85
- zookeeper accesses kafka using host names that are only configured within that network, not the public ip:s. for example kafka8v-internal-10-0-3-85.

Now, the FlinkKafkaConsumer connects to
- kafka public ip:s as configured (works)
- zookeeper public ip:s as configured (works)
- kafka internal host names as given by zookeeper (fails, because the host name doesn't resolve)

You can find some log output and a stacktrace at the end of this message.

Obviously this can be fixed by network configuration in two ways:
a) add the internal host names to /etc/hosts of the client node and map the internal kafka host names to their public ip:s
b) change zookeeper configuration to access kafka via public ip:s instead (FlinkKafkaConsumer gets those from zookeeper)

To me it seems that the FlinkKafkaConsumer could still use the ip:s (or host names) that are given as arguments, and not the broker host names retrieved from zookeeper. But I may be wrong to say that, if the concept of kafka bootstrap server and broker are entirely different.

I'm just trying to understand how this works and if it would even make sense for the FlinkKafkaConsumer to use the provided bootstrap servers for consuming. If yes, I would say my original conclusion holds, ie. bootstrap server hosts wouldn't be needed as an argument, or then FlinkKafkaConsumer wouldn't need to ask zookeeper for kafka hosts.

Cheers,
Juho


The log output & stacktrace:

10:24:43.306 [main] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker 10.0.3.85:9092 in try 0/3
10:24:48.270 [Custom Source (2/4)] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=10.0.3.38,10.0.3.48,10.0.3.69 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@a05936f
10:24:48.270 [Custom Source (1/4)] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=10.0.3.38,10.0.3.48,10.0.3.69 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@58f47fb0
10:24:48.302 [Custom Source (10.0.3.48:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket connection to server 10.0.3.48/10.0.3.48:2181. Will not attempt to authenticate using SASL (unknown error)
10:24:48.303 [Custom Source (10.0.3.38:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket connection to server 10.0.3.38/10.0.3.38:2181. Will not attempt to authenticate using SASL (unknown error)
10:24:48.444 [Custom Source (10.0.3.48:2181)] INFO  org.apache.zookeeper.ClientCnxn - Socket connection established to 10.0.3.48/10.0.3.48:2181, initiating session
10:24:48.445 [Custom Source (10.0.3.38:2181)] INFO  org.apache.zookeeper.ClientCnxn - Socket connection established to 10.0.3.38/10.0.3.38:2181, initiating session
10:24:48.595 [Custom Source (10.0.3.48:2181)] INFO  org.apache.zookeeper.ClientCnxn - Session establishment complete on server 10.0.3.48/10.0.3.48:2181, sessionid = 0x24f8e60da6141d9, negotiated timeout = 6000
10:24:48.595 [Custom Source (10.0.3.38:2181)] INFO  org.apache.zookeeper.ClientCnxn - Session establishment complete on server 10.0.3.38/10.0.3.38:2181, sessionid = 0x15004816d432073, negotiated timeout = 6000
10:24:48.763 [Custom Source (1/4)] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Reading from partitions {test.topic-0=-915623761776} using the legacy fetcher
10:24:48.764 [Custom Source (2/4)] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Reading from partitions {test.topic-1=-915623761776} using the legacy fetcher
10:24:48.764 [Thread-16] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker 10.0.3.93:9092 in try 0/3
10:24:48.764 [Thread-17] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Trying to get topic metadata from broker 10.0.3.85:9092 in try 0/3
10:24:49.052 [Custom Source (2/4)] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Starting thread SimpleConsumer - StreamSource - broker-85 (kafka8v-internal-10-0-3-85:9092)
10:24:49.052 [Custom Source (1/4)] INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer - Starting thread SimpleConsumer - StreamSource - broker-93 (kafka8v-internal-10-0-3-93:9092)
10:24:49.150 [Custom Source (1/4)] ERROR org.apache.flink.streaming.runtime.tasks.SourceStreamTask - Custom Source (1/4) failed
java.lang.Exception: null
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:241) ~[flink-connector-kafka-0.9.1.jar:0.9.1]
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) ~[flink-connector-kafka-0.9.1.jar:0.9.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:49) ~[flink-streaming-core-0.9.1.jar:0.9.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:55) ~[flink-streaming-core-0.9.1.jar:0.9.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) [flink-runtime-0.9.1.jar:0.9.1]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: java.nio.channels.ClosedChannelException: null
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:na]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.10-0.8.2.1.jar:na]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:498) ~[flink-connector-kafka-0.9.1.jar:0.9.1]
        at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:375) ~[flink-connector-kafka-0.9.1.jar:0.9.1]