timeout error while connecting to Kafka

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

timeout error while connecting to Kafka

Eyal Pe'er

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

Re: timeout error while connecting to Kafka

miki haiat
Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

Re: timeout error while connecting to Kafka

Qi Kang
In reply to this post by Eyal Pe'er
The code itself is fine. Turning the app’s log level to DEBUG will give you more information.

BTW, please make sure that the addresses of Kafka brokers are properly resolved.


On Aug 22, 2019, at 15:45, Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
 
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;
 
public class KafkaStreaming {
 
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "kafka servers:9092...");
    props.setProperty("group.id", "flinkPOC");
    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);
 
    DataStream<String> dataStream = env.addSource(consumer);
 
    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());
    wordDataStream.print();
    env.execute("Word Split");
 
}
 
public static class Splitter implements FlatMapFunction<String, String> {
 
    public void flatMap(String sentence, Collector<String> out) throws Exception {
 
        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }
 
}
}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> FlatMap -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?
 
Best regards
Eyal Peer / Data Platform Developer
<image001.png>

Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er
In reply to this post by miki haiat

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

Re: timeout error while connecting to Kafka

miki haiat
Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>



Is their any reason why you are using flink 1.5 and not latest release. 



Best,


Miki


On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

Re: timeout error while connecting to Kafka

Yitzchak Lieberman
What is the topic replication factor? how many kafka brokers do you have?
I were facing the same exception when one of my brokers was down and the topic had no replica (replication_factor=1)

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er

Replication factor is 1. In most of my topics this is the case.

Is it a problem to consume events from non-replicated topics ?

 

Best regards

Eyal Peer / Data Platform Developer

 

From: Yitzchak Lieberman <[hidden email]>
Sent: Sunday, August 25, 2019 3:13 PM
To: Eyal Pe'er <[hidden email]>
Cc: miki haiat <[hidden email]>; [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

What is the topic replication factor? how many kafka brokers do you have?

I were facing the same exception when one of my brokers was down and the topic had no replica (replication_factor=1)

 

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

Re: timeout error while connecting to Kafka

miki haiat
In reply to this post by Eyal Pe'er
Did you try to submit it to  remote cluster ?


On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er

What do you mean by “remote cluster”?

I tried to run dockerized Flink version (https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html) on a remote machine and to submit a job that supposed to communicate with Kafka, but still I cannot access the topic.

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Sunday, August 25, 2019 3:50 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Did you try to submit it to  remote cluster ?

 

 

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

Re: timeout error while connecting to Kafka

miki haiat
I'm trying to understand.
Did you submitted your jar throw the flink web UI ,
And then you got the time out error ?

On Sun, Aug 25, 2019, 16:10 Eyal Pe'er <[hidden email]> wrote:

What do you mean by “remote cluster”?

I tried to run dockerized Flink version (https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html) on a remote machine and to submit a job that supposed to communicate with Kafka, but still I cannot access the topic.

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Sunday, August 25, 2019 3:50 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Did you try to submit it to  remote cluster ?

 

 

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 


image001.png (8K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er

Nope, I submitted it throw the flink job master itself by running flink run -c <entrypoint>  sandbox.jar

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Sunday, August 25, 2019 4:21 PM
To: Eyal Pe'er <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: timeout error while connecting to Kafka

 

I'm trying to understand.

Did you submitted your jar throw the flink web UI ,

And then you got the time out error ?

 

On Sun, Aug 25, 2019, 16:10 Eyal Pe'er <[hidden email]> wrote:

What do you mean by “remote cluster”?

I tried to run dockerized Flink version (https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html) on a remote machine and to submit a job that supposed to communicate with Kafka, but still I cannot access the topic.

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Sunday, August 25, 2019 3:50 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Did you try to submit it to  remote cluster ?

 

 

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer

 

Reply | Threaded
Open this post in threaded view
|

RE: timeout error while connecting to Kafka

Eyal Pe'er

Hi,

Brief update.

I tried to run the same code, but this time I used another Kafka cluster that I have where the version is  0.11.

The code runs fine without the timeout exception.

 

In conclusion, it seems like the problem occurs only when consuming events from Kafka 0.9. currently, I have no idea how to solve it.

If someone was able to consume events from kafka 0.9, please let me know.

 

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 4:34 PM
To: miki haiat <[hidden email]>
Cc: user <[hidden email]>
Subject: RE: timeout error while connecting to Kafka

 

Nope, I submitted it throw the flink job master itself by running flink run -c <entrypoint>  sandbox.jar

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Sunday, August 25, 2019 4:21 PM
To: Eyal Pe'er <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: timeout error while connecting to Kafka

 

I'm trying to understand.

Did you submitted your jar throw the flink web UI ,

And then you got the time out error ?

 

On Sun, Aug 25, 2019, 16:10 Eyal Pe'er <[hidden email]> wrote:

What do you mean by “remote cluster”?

I tried to run dockerized Flink version (https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html) on a remote machine and to submit a job that supposed to communicate with Kafka, but still I cannot access the topic.

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Sunday, August 25, 2019 3:50 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Did you try to submit it to  remote cluster ?

 

 

On Sun, Aug 25, 2019 at 2:55 PM Eyal Pe'er <[hidden email]> wrote:

BTW, the exception that I see in the log is: ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception occurred in REST handler…

Best regards

Eyal Peer / Data Platform Developer

 

From: Eyal Pe'er <[hidden email]>
Sent: Sunday, August 25, 2019 2:20 PM
To: miki haiat <[hidden email]>
Cc: [hidden email]
Subject: RE: timeout error while connecting to Kafka

 

Hi,

I removed that dependency, but it still fails.

The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).

If needed, I can change it.

 

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper, instead of the bootstrap servers ?

I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").

I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8, but maybe I still need to use it ?  

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you try to remove this from your pom file .

 <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Is their any reason why you are using flink 1.5 and not latest release. 

 

 

Best,


Miki

 

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <[hidden email]> wrote:

Hi Miki,

First, I would like to thank you for the fast response.

I recheck Kafka and it is up and running fine.

I’m still getting the same error (Timeout expired while fetching topic metadata).

Maybe my Flink version is wrong (Kafka version is 0.9)?

 

<dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-core</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_2.11</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>1.5.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_2.10</artifactId>

            <version>1.1.4</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka_2.11</artifactId>

            <version>1.7.0</version>

        </dependency>

 

 

Best regards

Eyal Peer / Data Platform Developer

 

From: miki haiat <[hidden email]>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <[hidden email]>
Cc: [hidden email]
Subject: Re: timeout error while connecting to Kafka

 

Can you double check that the kafka instance is up ?
The code looks fine.

 

 

Best,

 

Miki 

 

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <[hidden email]> wrote:

Hi,

I'm trying to consume events using Apache Flink.

The code is very basic, trying to connect the topic split words by space and print it to the console. Kafka version is 0.9.

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

 

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import org.apache.flink.util.Collector;

import java.util.Properties;

 

public class KafkaStreaming {

 

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", "kafka servers:9092...");

    props.setProperty("group.id", "flinkPOC");

    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);

 

    DataStream<String> dataStream = env.addSource(consumer);

 

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());

    wordDataStream.print();

    env.execute("Word Split");

 

}

 

public static class Splitter implements FlatMapFunction<String, String> {

 

    public void flatMap(String sentence, Collector<String> out) throws Exception {

 

        for (String word : sentence.split(" ")) {

            out.collect(word);

        }

    }

 

}

}

 

The app does not print anything to the screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind of authentication is not required from Kafka.

This is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

 

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue persists. So my assumption is that I’m doing something wrong…

Did you encounter such issue? Does someone have different code for consuming Kafka events ?

 

Best regards

Eyal Peer / Data Platform Developer