Json KAFKA producer

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Json KAFKA producer

Luigi Sgaglione
Hi,

I'm trying to create a Flink example with kafka consumer and producer using Json data format. In particular, I'm able to consume and process Json data published on a Kafka topic, but not to publish the results.

The problem is that I don't know what is the serialization schema that should be used to publish an ObjectNode (Jackson).


This is an excerpt of my test code

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
....

FlinkKafkaConsumer011<ObjectNode> myConsumer = new FlinkKafkaConsumer011<>("test2", new JSONDeserializationSchema(), properties);
myConsumer.setStartFromEarliest();  

DataStreamSource<ObjectNode> stream = env.addSource(myConsumer);
SingleOutputStreamOperator<ObjectNode> out1 = stream.filter(new FilterFunction<ObjectNode>() {
private static final long serialVersionUID = 1L;

@Override
public boolean filter(ObjectNode arg0) throws Exception {
String temp=arg0.get("value").asText();
return (!temp.equals("1"));
}
});
FlinkKafkaProducer011<ObjectNode> producer = new FlinkKafkaProducer011<ObjectNode>("192.168.112.128:9092", "flinkOut", XXX);
out1.addsink(producer);

Can you help me to understand how I can publish an ObjectNode?

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Json KAFKA producer

Fabian Hueske-2
Hi,

SerializationSchema is a public interface that you can implement.
It has a single method to turn an object into a byte array. 

I would suggest to implement your own SerializationSchema.

Best, Fabian

2018-04-11 15:56 GMT+02:00 Luigi Sgaglione <[hidden email]>:
Hi,

I'm trying to create a Flink example with kafka consumer and producer using Json data format. In particular, I'm able to consume and process Json data published on a Kafka topic, but not to publish the results.

The problem is that I don't know what is the serialization schema that should be used to publish an ObjectNode (Jackson).


This is an excerpt of my test code

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
....

FlinkKafkaConsumer011<ObjectNode> myConsumer = new FlinkKafkaConsumer011<>("test2", new JSONDeserializationSchema(), properties);
myConsumer.setStartFromEarliest();  

DataStreamSource<ObjectNode> stream = env.addSource(myConsumer);
SingleOutputStreamOperator<ObjectNode> out1 = stream.filter(new FilterFunction<ObjectNode>() {
private static final long serialVersionUID = 1L;

@Override
public boolean filter(ObjectNode arg0) throws Exception {
String temp=arg0.get("value").asText();
return (!temp.equals("1"));
}
});
FlinkKafkaProducer011<ObjectNode> producer = new FlinkKafkaProducer011<ObjectNode>("192.168.112.128:9092", "flinkOut", XXX);
out1.addsink(producer);

Can you help me to understand how I can publish an ObjectNode?

Thanks