|
This post was updated on .
I am reading upstream kafka queue and for every message parse its key and based on the value put data in corresponding kafka queue.
DataStream<ObjectNode> stream = env.addSource(new FlinkKafkaConsumer08<ObjectNode>(topics, new JSONDeserializationSchema(), properties));
DataStream<String> sinkData = stream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode obj) {
return obj.asText();
}
});
DataStream<String> ds = sinkData;
DataStream<String> identifier = stream.map(new MapFunction<ObjectNode, String>() {
@Override
public String map(ObjectNode obj) {
if (obj.get("identifier").get("key").asText().equals("shell")){
ds.addSink(new FlinkKafkaProducer08<String>("sandbox.hortonworks.com:6667","shellIN", new SimpleStringSchema()));
}
}
});
identifier.print();
env.execute("Kafka-Flink Serialization Test");
}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object wikiedits.KafkaFlinkExample$2@79497d11 not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:160)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:505)
at wikiedits.KafkaFlinkExample.main(KafkaFlinkExample.java:45)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
|