I have following code: ////////////////////// Properties properties = new Properties(); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName()); FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer( "test-kafka=topic", new SimpleStringSchema(), properties); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer); DataStream<String> stringStream = kafkaInputStream .map(new MapFunction<MyCustomClass,String>() { @Override public String map(MyCustomClass message) { logger.info("--- Received message : " + message.toString()); return message.toString(); } }); streamEnv.execute("Published messages"); ///////
MyCustomClassDeserializer is implemented as: public MyCustomClass deserialize(String s, byte[] bytes) { return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, MyCustomClass.class); } When I run this program locally, I get error: Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected. Why I get this error? |
Hi, Can you provide the full stack trace of your exception? Most likely, the error is caused by this setting: properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName()); You need to use Flink's DeserializationSchema. On Mon, May 4, 2020 at 10:26 AM Manish G <[hidden email]> wrote:
|
Thanks. It worked by introducing a custom
DeserializationSchema. On Mon, May 4, 2020 at 3:04 PM Robert Metzger <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |