Hello,
I recently started programming with Apache Flink API. I am trying to get input directly from kafka in a JSON format with the following code: private void kafkaConsumer(String server, String topic) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", server); properties.setProperty("group.id", "Demo"); stream = environment.addSource(new FlinkKafkaConsumer09<>(topic, new JSONDeserializationSchema(), properties)) .map(new MapFunction<ObjectNode, Event>() { @Override public Event map(ObjectNode value) throws Exception { return new Event(Integer.parseInt(value.get("id").asText()), value.get("user").asText(), value.get("action").asText(), value.get("ip").asText()); } }); } But I alwys get the following error: 17:56:46,335 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed. com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: [B@69a90966; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3095) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3036) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215) at org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.deserialize(JSONDeserializationSchema.java:38) at org.apache.flink.streaming.util.serialization.JSONDeserializationSchema.deserialize(JSONDeserializationSchema.java:30) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227) at java.lang.Thread.run(Thread.java:745) What am I doing wrong? Attached follows the JSON sample that I am using. Thank you and Regards. log.json
Best Regards,
Pedro Chaves |
This sounds like it is not Flink related - it seems more like a Jackson/Json question than a Flink question.
On Thu, Oct 13, 2016 at 5:56 PM, PedroMrChaves <[hidden email]> wrote: Hello, |
Free forum by Nabble | Edit this page |