Kinesis stream and serialization schemas

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Kinesis stream and serialization schemas

Yoandy Rodríguez

Hi everyone,

As I've mention in previous emails, we're currently exploring flink as a substitute for some in house products. One of these products sends JSON

data to a Kinesis Data Stream, another product process the records after some time.

We've tried to set up the Kinesis producer like this:

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);

But on the other application we kept getting some weird binary data, so right now we're using the following

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new KinesisSerializationSchema<String>() {
            private static final long serialVersionUID = -3435842401751414891L;
            @Override
            public ByteBuffer serialize(String element) {
                return ByteBuffer.wrap(element.getBytes());
            }
            @Override
            public String getTargetStream(String element) {
                return null;
            }
        }, producerConfig);

But that's not working either. What are we doing wrong?.

Thanks in advance

-- 
Best Regards
Yoandy Rodríguez