I have seen this in the past and running into it again.
I have a kafka consumer that is not getting all the records from the topic. Kafka conforms there are 300k messages in each partition, and flink only sees a total of 8000 records in the source. Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011 Properties props = new Properties(); props.setProperty("bootstrap.servers", servers); props.setProperty("group.id", UUID.randomUUID().toString()); props.setProperty("flink.partition-discovery.interval-millis", "10000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DataStream<String> ds = consumers.get(a.eventType); if (ds == null) { FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>( topic, new SimpleStringSchema(), props); cons.setStartFromEarliest(); ds = env.addSource(cons).name(et.name).rebalance(); consumers.put(a.eventType, ds); } I am about to rip out the kafka consumer and build a source using the client library which has been 100% reliable in working with kafka. Any pointers welcome. Michael |
It looks like it is some issue with backpressure as the same behavior happens with the client library as a custom source.
Michael > On Aug 16, 2018, at 6:59 PM, TechnoMage <[hidden email]> wrote: > > I have seen this in the past and running into it again. > > I have a kafka consumer that is not getting all the records from the topic. Kafka conforms there are 300k messages in each partition, and flink only sees a total of 8000 records in the source. > > Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011 > > Properties props = new Properties(); > props.setProperty("bootstrap.servers", servers); > props.setProperty("group.id", UUID.randomUUID().toString()); > props.setProperty("flink.partition-discovery.interval-millis", "10000"); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > DataStream<String> ds = consumers.get(a.eventType); > if (ds == null) { > FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>( > topic, new SimpleStringSchema(), props); > cons.setStartFromEarliest(); > ds = env.addSource(cons).name(et.name).rebalance(); > consumers.put(a.eventType, ds); > } > > I am about to rip out the kafka consumer and build a source using the client library which has been 100% reliable in working with kafka. Any pointers welcome. > > Michael > |
Free forum by Nabble | Edit this page |