Flink Deserialisation JSON to Java;

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Deserialisation JSON to Java;

Aissa Elaffani
Hello,
Please can you share with me, some demos or examples of deserialization with flink.
I need to consume some kafka message produced by sensors in JSON format.
here is my JSON message :
{"date": "2018-05-31 15:10", "main": {"ph": 5.0, "whc": 60.0, "temperature": 9.5, "humidity": 96}, "id": 2582, "coord": {"lat": 57.79, "lon": -54.58}}

Reply | Threaded
Open this post in threaded view
|

Re: Flink Deserialisation JSON to Java;

Jark Wu-3
Hi Aissa,

You can easily do this by using Flink SQL, you can define a kafka table using Flink DDL:

CREATE TABLE sensor_logs (
    `date` STRING,
    `main` ROW<
        `ph` DOUBLE,
        `whc` DOUBLE,
        `temperature` DOUBLE,
        `humidity` DOUBLE>,
    `id` BIGINT,
    `coord` ROW<
         `lat` DOUBLE,
         `lon` DOUBLE>
) WITH (
    'connector.type' = 'kafka',  -- using kafka connector
    'connector.version' = 'universal',  -- kafka version, universal supports Kafka 0.11+
    'connector.topic' = 'sensor_logs',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- reading from the beginning
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper address
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address
    'format.type' = 'json'  -- the data format is json
);

And then, you can use `SELECT * FROM logs` query to get the structrued data. 
You can refer the documentation about more details [1]. Please remember to add the
required depency jars into your cluster[2].

Best,
Jark


On Tue, 5 May 2020 at 07:54, Aissa Elaffani <[hidden email]> wrote:
Hello,
Please can you share with me, some demos or examples of deserialization with flink.
I need to consume some kafka message produced by sensors in JSON format.
here is my JSON message :
{"date": "2018-05-31 15:10", "main": {"ph": 5.0, "whc": 60.0, "temperature": 9.5, "humidity": 96}, "id": 2582, "coord": {"lat": 57.79, "lon": -54.58}}