Weird Kafka Connector issue

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird Kafka Connector issue

Michael Latta
I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.

Michael

      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }

Reply | Threaded
Open this post in threaded view
|

Re: Weird Kafka Connector issue

Michael Latta
Still trying to figure out what is happening with this kafka topic.

1) No exceptions in the task manager log or the UI exceptions tab.
2) Topic partitions being reset to offset 0 confirmed in log.
3) Other topics in this and other tests show full consumption of messages (all JSON format text).
4) The source shows more records output than are received by 2 of the 3 following stages.

The diagram for the job is below, as is the GUI showing tasks and record counts.



On Apr 23, 2018, at 11:23 AM, TechnoMage <[hidden email]> wrote:

I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.

Michael

      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }


Reply | Threaded
Open this post in threaded view
|

Re: Weird Kafka Connector issue

Tzu-Li (Gordon) Tai
Hi,

Just to clarify your observation here:

Is the problem the fact that map operators after the “source: travel” Kafka topic source do not receive all records from the source?
This does seem weird, but as of now I don’t really have ideas yet of how this could maybe be Flink related.

One other thing to be sure of - have you verified that the outputs of your test are incorrect?
Or are you assuming that it is incorrect based on the weird metric numbers shown on the web ui?

Cheers,
Gordon

On 25 April 2018 at 6:13:07 AM, TechnoMage ([hidden email]) wrote:

Still trying to figure out what is happening with this kafka topic.

1) No exceptions in the task manager log or the UI exceptions tab.
2) Topic partitions being reset to offset 0 confirmed in log.
3) Other topics in this and other tests show full consumption of messages (all JSON format text).
4) The source shows more records output than are received by 2 of the 3 following stages.

The diagram for the job is below, as is the GUI showing tasks and record counts.



On Apr 23, 2018, at 11:23 AM, TechnoMage <[hidden email]> wrote:

I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.

Michael

      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }


Reply | Threaded
Open this post in threaded view
|

Re: Weird Kafka Connector issue

Michael Latta
I have another java program reading the topic to monitor the test.  It receives 60,000 records on the “travel” topic, while the kafka consumer only reports 4,138.  That and the incongruity of the source to the maps are what seems very weird.  I have several other topics where the source is built with the exact same code, and where there are multiple maps following, and the numbers all match expectations.

Michael

On Apr 25, 2018, at 12:59 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
.
Hi,

Just to clarify your observation here:

Is the problem the fact that map operators after the “source: travel” Kafka topic source do not receive all records from the source?
This does seem weird, but as of now I don’t really have ideas yet of how this could maybe be Flink related.

One other thing to be sure of - have you verified that the outputs of your test are incorrect?
Or are you assuming that it is incorrect based on the weird metric numbers shown on the web ui?

Cheers,
Gordon

On 25 April 2018 at 6:13:07 AM, TechnoMage ([hidden email]) wrote:

Still trying to figure out what is happening with this kafka topic.

1) No exceptions in the task manager log or the UI exceptions tab.
2) Topic partitions being reset to offset 0 confirmed in log.
3) Other topics in this and other tests show full consumption of messages (all JSON format text).
4) The source shows more records output than are received by 2 of the 3 following stages.

The diagram for the job is below, as is the GUI showing tasks and record counts.

<[hidden email].>

On Apr 23, 2018, at 11:23 AM, TechnoMage <[hidden email]> wrote:

I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.

Michael

      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }

Reply | Threaded
Open this post in threaded view
|

Re: Weird Kafka Connector issue

Michael Latta
Just in case it is a metrics bug, I will add a step to do my own counting in the Flink job.

Michael

On Apr 25, 2018, at 9:52 AM, TechnoMage <[hidden email]> wrote:

I have another java program reading the topic to monitor the test.  It receives 60,000 records on the “travel” topic, while the kafka consumer only reports 4,138.  That and the incongruity of the source to the maps are what seems very weird.  I have several other topics where the source is built with the exact same code, and where there are multiple maps following, and the numbers all match expectations.

Michael

On Apr 25, 2018, at 12:59 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
.
Hi,

Just to clarify your observation here:

Is the problem the fact that map operators after the “source: travel” Kafka topic source do not receive all records from the source?
This does seem weird, but as of now I don’t really have ideas yet of how this could maybe be Flink related.

One other thing to be sure of - have you verified that the outputs of your test are incorrect?
Or are you assuming that it is incorrect based on the weird metric numbers shown on the web ui?

Cheers,
Gordon

On 25 April 2018 at 6:13:07 AM, TechnoMage ([hidden email]) wrote:

Still trying to figure out what is happening with this kafka topic.

1) No exceptions in the task manager log or the UI exceptions tab.
2) Topic partitions being reset to offset 0 confirmed in log.
3) Other topics in this and other tests show full consumption of messages (all JSON format text).
4) The source shows more records output than are received by 2 of the 3 following stages.

The diagram for the job is below, as is the GUI showing tasks and record counts.

<[hidden email].>

On Apr 23, 2018, at 11:23 AM, TechnoMage <[hidden email]> wrote:

I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.

Michael

      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }

Reply | Threaded
Open this post in threaded view
|

RE: Weird Kafka Connector issue

Nortman, Bill
In reply to this post by Michael Latta

We had a similar issue and it turns out the flink kafka connector doesn’t honor the groupID. This resulted in multiple consumers getting partial events (basically load balanced between my multiple consumers).

 

See

https://stackoverflow.com/questions/38639019/flink-kafka-consumer-groupid-not-working

 

From: TechnoMage [mailto:[hidden email]]
Sent: Wednesday, April 25, 2018 8:52 AM
To: Tzu-Li (Gordon) Tai
Cc: user
Subject: Re: Weird Kafka Connector issue

 

I have another java program reading the topic to monitor the test.  It receives 60,000 records on the “travel” topic, while the kafka consumer only reports 4,138.  That and the incongruity of the source to the maps are what seems very weird.  I have several other topics where the source is built with the exact same code, and where there are multiple maps following, and the numbers all match expectations.

 

Michael



On Apr 25, 2018, at 12:59 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

.

Hi,

 

Just to clarify your observation here:

 

Is the problem the fact that map operators after the “source: travel” Kafka topic source do not receive all records from the source?

This does seem weird, but as of now I don’t really have ideas yet of how this could maybe be Flink related.

 

One other thing to be sure of - have you verified that the outputs of your test are incorrect?

Or are you assuming that it is incorrect based on the weird metric numbers shown on the web ui?

 

Cheers,

Gordon

 

On 25 April 2018 at 6:13:07 AM, TechnoMage ([hidden email]) wrote:

Still trying to figure out what is happening with this kafka topic.

 

1) No exceptions in the task manager log or the UI exceptions tab.

2) Topic partitions being reset to offset 0 confirmed in log.

3) Other topics in this and other tests show full consumption of messages (all JSON format text).

4) The source shows more records output than are received by 2 of the 3 following stages.

 

The diagram for the job is below, as is the GUI showing tasks and record counts.

 

<[hidden email].>



On Apr 23, 2018, at 11:23 AM, TechnoMage <[hidden email]> wrote:

 

I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.

 

I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.

 

The code for creating the topic is below.

 

Any suggestions on why it is missing so many messages would be welcome.

 

Michael

 

      String topic = a.kafkaTopicName(ets);

      Properties props = new Properties();

      props.setProperty("bootstrap.servers", servers);

      props.setProperty("group.id", UUID.randomUUID().toString());

      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

      DataStream<String> ds = consumers.get(a.eventType);

      if (ds == null) {

        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(

            topic, new SimpleStringSchema(), props);

        cons.setStartFromEarliest();

        ds = env.addSource(cons).name(et.name).rebalance();

        consumers.put(a.eventType, ds);

      }

 

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.

Reply | Threaded
Open this post in threaded view
|

Re: Weird Kafka Connector issue

Michael Latta
So flink is not using the group id, but at the same time all other topics do read all published records.  Note that the number of partitions is equal to the parallelism of the flink job so there should be one flink task per partition.

Michael

On Apr 25, 2018, at 11:40 AM, Nortman, Bill <[hidden email]> wrote:

We had a similar issue and it turns out the flink kafka connector doesn’t honor the groupID. This resulted in multiple consumers getting partial events (basically load balanced between my multiple consumers).
 
See
 
From: TechnoMage [[hidden email]] 
Sent: Wednesday, April 25, 2018 8:52 AM
To: Tzu-Li (Gordon) Tai
Cc: user
Subject: Re: Weird Kafka Connector issue
 
I have another java program reading the topic to monitor the test.  It receives 60,000 records on the “travel” topic, while the kafka consumer only reports 4,138.  That and the incongruity of the source to the maps are what seems very weird.  I have several other topics where the source is built with the exact same code, and where there are multiple maps following, and the numbers all match expectations.
 
Michael


On Apr 25, 2018, at 12:59 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote:
.
Hi,
 
Just to clarify your observation here:
 
Is the problem the fact that map operators after the “source: travel” Kafka topic source do not receive all records from the source?
This does seem weird, but as of now I don’t really have ideas yet of how this could maybe be Flink related.
 
One other thing to be sure of - have you verified that the outputs of your test are incorrect?
Or are you assuming that it is incorrect based on the weird metric numbers shown on the web ui?
 
Cheers,
Gordon
 

On 25 April 2018 at 6:13:07 AM, TechnoMage ([hidden email]) wrote:

Still trying to figure out what is happening with this kafka topic.
 
1) No exceptions in the task manager log or the UI exceptions tab.
2) Topic partitions being reset to offset 0 confirmed in log.
3) Other topics in this and other tests show full consumption of messages (all JSON format text).
4) The source shows more records output than are received by 2 of the 3 following stages.
 
The diagram for the job is below, as is the GUI showing tasks and record counts.
 


On Apr 23, 2018, at 11:23 AM, TechnoMage <[hidden email]> wrote:
 
I have been using the kafka connector sucessfully for a while now.  But, am getting weird results in one case.
 
I have a test that submits 3 streams to kafka topics, and monitors them on a separate process.  The flink job has a source for each topic, and one such is fed to 3 separate map functions that lead to other operators.  This topic only shows 6097 out of 30000 published, and the map functions following the source only show a fraction of that as received.  The consumer is configured to start at the begining and in other cases the same code receives all messages published.  The parallelism is 6 if that makes a difference, as is the partitioning on the topics.
 
The code for creating the topic is below.
 
Any suggestions on why it is missing so many messages would be welcome.
 
Michael
 
      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }
 

This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction.  Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.