Hi there,
In my case, I want to use GeoIP2 in Flink Streaming, I know I need to serialize geoip2 related classes using Kryo. But I did know how to do it. Flink version: 1.0.0 Kafka version: 0.9.0.0 Deploy Mode: Local My demo code as below: File database = new File(“/home/user/GeoIP2-City.mmdb"); final DatabaseReader reader = new DatabaseReader.Builder(database).build(); DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties)); messageStream .rebalance() .map(new MapFunction<String, String>() { public String map(String value) throws Exception { InetAddress ipAddress = InetAddress.getByName(value); CityResponse response = reader.city(ipAddress); Country country = response.getCountry(); return "Kafka and Flink says: " + value + " " + country; } }).print(); env.execute(); I got the error below: Object FlinkTest$1@7c7d3c46 not serializable org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160) org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505) FlinkTest.main(FlinkTest.java:36) Any one can help me ?
|
Your code has to send the variable "DatabaseReader reader" into the cluster together with the map function. The class is not serializable, which means you cannot ship it like that. If you control the code of the DatabaseReader , try to make the class serializable. If you cannot change the code of the DatabaseReader DatabaseReader, you can try to do the following: - (1) copy the file into a distributed filesystem - (2) use a RichMapFunction, and in open(), you load the database from the distributed file system stream. On Mon, Apr 4, 2016 at 4:52 PM, Zhun Shen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |