FlinkKafkaConsumer09 not reading from all partitions?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaConsumer09 not reading from all partitions?

Curtis Wilde

I’m just getting my feet wet with Flink, doing a quick implementation on my laptop following the examples.

 

I’m consuming from a Kafka 0.9 cluster using FlinkKafkaConsumer09 from a topic with 50 partitions.

 

I see offset commit messages for partitions 0-9, but I don’t see offsets being committed for partitions 10-49.

 

Any ideas what’s going on?

 

I’ve been messing around with consumer configs to no avail:

 

enable.auto.commit=true,

group.id=flink-test,

flink.poll-timeout=9223372036854775807,

auto.offset.reset=earliest,

session.timeout.ms=120000,

bootstrap.servers=n1:9092,n2:9092,n3:9092,n4:9092,n5:9092,

request.timeout.ms=120500,

topic=mytopic,

max.partition.fetch.bytes=13631488,

auto.commit.interval.ms=5000

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer09 not reading from all partitions?

Stephan Ewen
Is this based on log messages, or can you confirm that from some partitions, no data is read?


On Tue, Sep 20, 2016 at 6:03 PM, Curtis Wilde <[hidden email]> wrote:

I’m just getting my feet wet with Flink, doing a quick implementation on my laptop following the examples.

 

I’m consuming from a Kafka 0.9 cluster using FlinkKafkaConsumer09 from a topic with 50 partitions.

 

I see offset commit messages for partitions 0-9, but I don’t see offsets being committed for partitions 10-49.

 

Any ideas what’s going on?

 

I’ve been messing around with consumer configs to no avail:

 

enable.auto.commit=true,

group.id=flink-test,

flink.poll-timeout=9223372036854775807,

auto.offset.reset=earliest,

session.timeout.ms=120000,

bootstrap.servers=n1:9092,n2:9092,n3:9092,n4:9092,n5:9092,

request.timeout.ms=120500,

topic=mytopic,

max.partition.fetch.bytes=13631488,

auto.commit.interval.ms=5000


Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer09 not reading from all partitions?

Curtis Wilde

Enabling logging, I see that it is only getting 10 partitions for some reason (see the last two lines below).

 

 

2016-09-20 14:27:47 INFO  ConsumerConfig:165 - ConsumerConfig values:

       metric.reporters = []

       metadata.max.age.ms = 300000

       value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

       group.id = cwilde-flink-test

       partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]

       reconnect.backoff.ms = 50

       sasl.kerberos.ticket.renew.window.factor = 0.8

       max.partition.fetch.bytes = 13631488

       bootstrap.servers = [10.13.161.98:9092, 10.13.161.100:9092, 10.13.161.101:9092, 10.13.161.102:9092, 10.13.161.103:9092]

       retry.backoff.ms = 100

       sasl.kerberos.kinit.cmd = /usr/bin/kinit

       sasl.kerberos.service.name = null

       sasl.kerberos.ticket.renew.jitter = 0.05

       ssl.keystore.type = JKS

       ssl.trustmanager.algorithm = PKIX

       enable.auto.commit = true

       ssl.key.password = null

       fetch.max.wait.ms = 500

       sasl.kerberos.min.time.before.relogin = 60000

       connections.max.idle.ms = 540000

       ssl.truststore.password = null

       session.timeout.ms = 120000

       metrics.num.samples = 2

       client.id =

       ssl.endpoint.identification.algorithm = null

       key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

       ssl.protocol = TLS

       check.crcs = true

       request.timeout.ms = 120500

       ssl.provider = null

       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

       ssl.keystore.location = null

       heartbeat.interval.ms = 3000

       auto.commit.interval.ms = 5000

       receive.buffer.bytes = 32768

       ssl.cipher.suites = null

       ssl.truststore.type = JKS

       security.protocol = PLAINTEXT

       ssl.truststore.location = null

       ssl.keystore.password = null

       ssl.keymanager.algorithm = SunX509

       metrics.sample.window.ms = 30000

       fetch.min.bytes = 1

       send.buffer.bytes = 131072

       auto.offset.reset = earliest

 

2016-09-20 14:27:49 WARN  ConsumerConfig:173 - The configuration topic = eventdata was supplied but isn't a known config.

2016-09-20 14:27:49 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1

2016-09-20 14:27:49 INFO  AppInfoParser:83 - Kafka commitId : 23c69d62a0cabf06

2016-09-20 14:27:49 INFO  FlinkKafkaConsumer09:189 - Got 10 partitions from these topics: [eventdata]

2016-09-20 14:27:49 INFO  FlinkKafkaConsumer09:468 - Consumer is going to read the following topics (with number of partitions): eventdata (10),

 

From: Stephan Ewen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, September 20, 2016 at 1:43 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: FlinkKafkaConsumer09 not reading from all partitions?

 

Is this based on log messages, or can you confirm that from some partitions, no data is read?

 

 

On Tue, Sep 20, 2016 at 6:03 PM, Curtis Wilde <[hidden email]> wrote:

I’m just getting my feet wet with Flink, doing a quick implementation on my laptop following the examples.

 

I’m consuming from a Kafka 0.9 cluster using FlinkKafkaConsumer09 from a topic with 50 partitions.

 

I see offset commit messages for partitions 0-9, but I don’t see offsets being committed for partitions 10-49.

 

Any ideas what’s going on?

 

I’ve been messing around with consumer configs to no avail:

 

enable.auto.commit=true,

group.id=flink-test,

flink.poll-timeout=9223372036854775807,

auto.offset.reset=earliest,

session.timeout.ms=120000,

bootstrap.servers=n1:9092,n2:9092,n3:9092,n4:9092,n5:9092,

request.timeout.ms=120500,

topic=mytopic,

max.partition.fetch.bytes=13631488,

auto.commit.interval.ms=5000

 

Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaConsumer09 not reading from all partitions?

Curtis Wilde

Digging deeper, it looks like this was a configuration issue. I was pulling from a different cluster that, in fact, has only 10 partitions for this topic.

(°□°) ┻━┻

 

From: Curtis Wilde <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, September 20, 2016 at 2:36 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: FlinkKafkaConsumer09 not reading from all partitions?

 

Enabling logging, I see that it is only getting 10 partitions for some reason (see the last two lines below).

 

 

2016-09-20 14:27:47 INFO  ConsumerConfig:165 - ConsumerConfig values:

       metric.reporters = []

       metadata.max.age.ms = 300000

       value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

       group.id = cwilde-flink-test

       partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]

       reconnect.backoff.ms = 50

       sasl.kerberos.ticket.renew.window.factor = 0.8

       max.partition.fetch.bytes = 13631488

       bootstrap.servers = [10.13.161.98:9092, 10.13.161.100:9092, 10.13.161.101:9092, 10.13.161.102:9092, 10.13.161.103:9092]

       retry.backoff.ms = 100

       sasl.kerberos.kinit.cmd = /usr/bin/kinit

       sasl.kerberos.service.name = null

       sasl.kerberos.ticket.renew.jitter = 0.05

       ssl.keystore.type = JKS

       ssl.trustmanager.algorithm = PKIX

       enable.auto.commit = true

       ssl.key.password = null

       fetch.max.wait.ms = 500

       sasl.kerberos.min.time.before.relogin = 60000

       connections.max.idle.ms = 540000

       ssl.truststore.password = null

       session.timeout.ms = 120000

       metrics.num.samples = 2

       client.id =

       ssl.endpoint.identification.algorithm = null

       key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

       ssl.protocol = TLS

       check.crcs = true

       request.timeout.ms = 120500

       ssl.provider = null

       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

       ssl.keystore.location = null

       heartbeat.interval.ms = 3000

       auto.commit.interval.ms = 5000

       receive.buffer.bytes = 32768

       ssl.cipher.suites = null

       ssl.truststore.type = JKS

       security.protocol = PLAINTEXT

       ssl.truststore.location = null

       ssl.keystore.password = null

       ssl.keymanager.algorithm = SunX509

       metrics.sample.window.ms = 30000

       fetch.min.bytes = 1

       send.buffer.bytes = 131072

       auto.offset.reset = earliest

 

2016-09-20 14:27:49 WARN  ConsumerConfig:173 - The configuration topic = eventdata was supplied but isn't a known config.

2016-09-20 14:27:49 INFO  AppInfoParser:82 - Kafka version : 0.9.0.1

2016-09-20 14:27:49 INFO  AppInfoParser:83 - Kafka commitId : 23c69d62a0cabf06

2016-09-20 14:27:49 INFO  FlinkKafkaConsumer09:189 - Got 10 partitions from these topics: [eventdata]

2016-09-20 14:27:49 INFO  FlinkKafkaConsumer09:468 - Consumer is going to read the following topics (with number of partitions): eventdata (10),

 

From: Stephan Ewen <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Tuesday, September 20, 2016 at 1:43 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: FlinkKafkaConsumer09 not reading from all partitions?

 

Is this based on log messages, or can you confirm that from some partitions, no data is read?

 

 

On Tue, Sep 20, 2016 at 6:03 PM, Curtis Wilde <[hidden email]> wrote:

I’m just getting my feet wet with Flink, doing a quick implementation on my laptop following the examples.

 

I’m consuming from a Kafka 0.9 cluster using FlinkKafkaConsumer09 from a topic with 50 partitions.

 

I see offset commit messages for partitions 0-9, but I don’t see offsets being committed for partitions 10-49.

 

Any ideas what’s going on?

 

I’ve been messing around with consumer configs to no avail:

 

enable.auto.commit=true,

group.id=flink-test,

flink.poll-timeout=9223372036854775807,

auto.offset.reset=earliest,

session.timeout.ms=120000,

bootstrap.servers=n1:9092,n2:9092,n3:9092,n4:9092,n5:9092,

request.timeout.ms=120500,

topic=mytopic,

max.partition.fetch.bytes=13631488,

auto.commit.interval.ms=5000