Kafka connector issue

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka connector issue

Michael Latta
I have seen this in the past and running into it again.

I have a kafka consumer that is not getting all the records from the topic.  Kafka conforms there are 300k messages in each partition, and flink only sees a total of 8000 records in the source.

Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011

      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.setProperty("flink.partition-discovery.interval-millis", "10000");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }

I am about to rip out the kafka consumer and build a source using the client library which has been 100% reliable in working with kafka.  Any pointers welcome.

Michael

Reply | Threaded
Open this post in threaded view
|

Re: Kafka connector issue

Michael Latta
It looks like it is some issue with backpressure as the same behavior happens with the client library as a custom source.

Michael

> On Aug 16, 2018, at 6:59 PM, TechnoMage <[hidden email]> wrote:
>
> I have seen this in the past and running into it again.
>
> I have a kafka consumer that is not getting all the records from the topic.  Kafka conforms there are 300k messages in each partition, and flink only sees a total of 8000 records in the source.
>
> Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011
>
>      Properties props = new Properties();
>      props.setProperty("bootstrap.servers", servers);
>      props.setProperty("group.id", UUID.randomUUID().toString());
>      props.setProperty("flink.partition-discovery.interval-millis", "10000");
>      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>      DataStream<String> ds = consumers.get(a.eventType);
>      if (ds == null) {
>        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
>            topic, new SimpleStringSchema(), props);
>        cons.setStartFromEarliest();
>        ds = env.addSource(cons).name(et.name).rebalance();
>        consumers.put(a.eventType, ds);
>      }
>
> I am about to rip out the kafka consumer and build a source using the client library which has been 100% reliable in working with kafka.  Any pointers welcome.
>
> Michael
>