InvalidTypesException: Input mismatch while consuming Kafka messages

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

InvalidTypesException: Input mismatch while consuming Kafka messages

Manish G
I have following code:

//////////////////////
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka=topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

///////
MyCustomClassDeserializer is implemented as:

public MyCustomClass deserialize(String s, byte[] bytes) {
        return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, MyCustomClass.class);
    }

When I run this program locally, I get error:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.

Why I get this error?
Reply | Threaded
Open this post in threaded view
|

Re: InvalidTypesException: Input mismatch while consuming Kafka messages

rmetzger0
Hi,
Can you provide the full stack trace of your exception?
Most likely, the error is caused by this setting:

properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

You need to use Flink's DeserializationSchema.

On Mon, May 4, 2020 at 10:26 AM Manish G <[hidden email]> wrote:
I have following code:

//////////////////////
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka=topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

///////
MyCustomClassDeserializer is implemented as:

public MyCustomClass deserialize(String s, byte[] bytes) {
        return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, MyCustomClass.class);
    }

When I run this program locally, I get error:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.

Why I get this error?
Reply | Threaded
Open this post in threaded view
|

Re: InvalidTypesException: Input mismatch while consuming Kafka messages

Manish G
Thanks. It worked by introducing a custom DeserializationSchema.

On Mon, May 4, 2020 at 3:04 PM Robert Metzger <[hidden email]> wrote:
Hi,
Can you provide the full stack trace of your exception?
Most likely, the error is caused by this setting:

properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

You need to use Flink's DeserializationSchema.

On Mon, May 4, 2020 at 10:26 AM Manish G <[hidden email]> wrote:
I have following code:

//////////////////////
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka=topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

///////
MyCustomClassDeserializer is implemented as:

public MyCustomClass deserialize(String s, byte[] bytes) {
        return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, MyCustomClass.class);
    }

When I run this program locally, I get error:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.

Why I get this error?