Not able to write data dynamically to Kafka Queue

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Not able to write data dynamically to Kafka Queue

pankhuri
This post was updated on .
I am reading upstream kafka queue and for every message parse its key and based on the value put data in corresponding kafka queue.

        DataStream<ObjectNode> stream = env.addSource(new FlinkKafkaConsumer08<ObjectNode>(topics, new JSONDeserializationSchema(), properties));
        DataStream<String> sinkData = stream.map(new MapFunction<ObjectNode, String>() {
            @Override
            public String map(ObjectNode obj) {
            return obj.asText();
                }
        });
        DataStream<String> ds = sinkData;
        DataStream<String> identifier = stream.map(new MapFunction<ObjectNode, String>() {
        @Override
        public String map(ObjectNode obj) {
                if (obj.get("identifier").get("key").asText().equals("shell")){
                      ds.addSink(new FlinkKafkaProducer08<String>("sandbox.hortonworks.com:6667","shellIN", new SimpleStringSchema()));
                        }
        }
});
       
        identifier.print();
        env.execute("Kafka-Flink Serialization Test");
    }

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object wikiedits.KafkaFlinkExample$2@79497d11 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
        at wikiedits.KafkaFlinkExample.main(KafkaFlinkExample.java:45)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator