Using GeoIP2 in Flink Streaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Using GeoIP2 in Flink Streaming

Zhun Shen
Hi there,

In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize geoip2 related classes using Kryo. But I did know how to do it.

Flink version: 1.0.0
Kafka version: 0.9.0.0
Deploy Mode: Local

My demo code as below:

        File database = new File(“/home/user/GeoIP2-City.mmdb");
        final DatabaseReader reader = new DatabaseReader.Builder(database).build();
        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));

        messageStream
                .rebalance()
                .map(new MapFunction<String, String>() {
                    public String map(String value) throws Exception {

                        InetAddress ipAddress = InetAddress.getByName(value);
                        CityResponse response = reader.city(ipAddress);
                        Country country = response.getCountry();
                        return "Kafka and Flink says: " + value + " " + country;
                    }
                }).print();

        env.execute(); 


I got the error below:

Object FlinkTest$1@7c7d3c46 not serializable
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
FlinkTest.main(FlinkTest.java:36)

Any one can help me ?
Reply | Threaded
Open this post in threaded view
|

Re: Using GeoIP2 in Flink Streaming

Stephan Ewen
Your code has to send the variable "DatabaseReader reader" into the cluster together with the map function.
The class is not serializable, which means you cannot ship it like that.

If you control the code of the DatabaseReader , try to make the class serializable.

If you cannot change the code of the DatabaseReader DatabaseReader, you can try to do the following:

  - (1) copy the file into a distributed filesystem
  - (2) use a RichMapFunction, and in open(), you load the database from the distributed file system stream.


On Mon, Apr 4, 2016 at 4:52 PM, Zhun Shen <[hidden email]> wrote:
Hi there,

In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize geoip2 related classes using Kryo. But I did know how to do it.

Flink version: 1.0.0
Kafka version: 0.9.0.0
Deploy Mode: Local

My demo code as below:

        File database = new File(“/home/user/GeoIP2-City.mmdb");
        final DatabaseReader reader = new DatabaseReader.Builder(database).build();
        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));

        messageStream
                .rebalance()
                .map(new MapFunction<String, String>() {
                    public String map(String value) throws Exception {

                        InetAddress ipAddress = InetAddress.getByName(value);
                        CityResponse response = reader.city(ipAddress);
                        Country country = response.getCountry();
                        return "Kafka and Flink says: " + value + " " + country;
                    }
                }).print();

        env.execute(); 


I got the error below:

Object FlinkTest$1@7c7d3c46 not serializable
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
FlinkTest.main(FlinkTest.java:36)

Any one can help me ?