HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

Jim Chen
Hi, everyone!

When i use flink1.10 to define table, and i want to define the json array as the string type. But the query resutl is null when i execute the program.
The detail code as follow:

package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings);
//        bsEnv.registerFunction("explode3", new ExplodeFunction());

        String ddlSource = "CREATE TABLE actionTable3 (\n" +
                "    action STRING\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test_action',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
                "    'update-mode' = 'append',\n" +
                "    'format.type' = 'json',\n" +
//                "    'format.derive-schema' = 'true',\n" +
                "    'format.json-schema' = '{\"type\": \"object\", \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
                ")";
        System.out.println(ddlSource);
        bsEnv.sqlUpdate(ddlSource);

        Table table = bsEnv.sqlQuery("select * from actionTable3");
//        Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(`action`)) as T(`word`)");
        table.printSchema();
        bsEnv.toAppendStream(table, Row.class)
                .print();// the result is null

        bsEnv.execute("ARRAY tableFunction Problem");
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

Benchao Li-2
Hi Jim,

This is a known issue[1], could you verify that if this issue meets your requirements?


Jim Chen <[hidden email]> 于2020年7月6日周一 下午1:28写道:
Hi, everyone!

When i use flink1.10 to define table, and i want to define the json array as the string type. But the query resutl is null when i execute the program.
The detail code as follow:

package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * kafka topic: test_action
 *
 * kafka message:
 *   {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }
 */
public class Problem2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings);
//        bsEnv.registerFunction("explode3", new ExplodeFunction());

        String ddlSource = "CREATE TABLE actionTable3 (\n" +
                "    action STRING\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = '0.11',\n" +
                "    'connector.topic' = 'test_action',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
                "    'update-mode' = 'append',\n" +
                "    'format.type' = 'json',\n" +
//                "    'format.derive-schema' = 'true',\n" +
                "    'format.json-schema' = '{\"type\": \"object\", \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
                ")";
        System.out.println(ddlSource);
        bsEnv.sqlUpdate(ddlSource);

        Table table = bsEnv.sqlQuery("select * from actionTable3");
//        Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(`action`)) as T(`word`)");
        table.printSchema();
        bsEnv.toAppendStream(table, Row.class)
                .print();// the result is null

        bsEnv.execute("ARRAY tableFunction Problem");
    }
}


--

Best,
Benchao Li