Table/SQL API to read and parse JSON, Java.

Posted by srikanth flink on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Table-SQL-API-to-read-and-parse-JSON-Java-tp31363.html

Hi there,

I'm following the link to read JSON data from Kafka and convert to table, programmatically. I'd try and succeed declarative using SQL client.

My Json data is nested like: {a:1,b,2,c:{x:1,y:2}}.
Code:
String schema = "{type: 'object', properties: {'message': {type: 'string'},'@timestamp': {type: 'string'}}}";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().getCheckpointTimeout();
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.connect(new Kafka().version("universal").topic("recon-data").startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Json().failOnMissingField(false).jsonSchema(schema).deriveSchema())
.withSchema(new Schema().field("message", Types.STRING()).field("@timestamp", Types.LOCAL_DATE_TIME()))
.inAppendMode().registerTableSource("reconTableS");

Table t = tableEnv.sqlQuery("select * from reconTableS");
DataStream<Row> out = tableEnv.toAppendStream(t, Row.class);
out.print();

try {
env.execute("Flink Example Json");
} catch (Exception e) {
e.printStackTrace();
}
}

pom.xml:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
</dependencies>

The code threw the following error:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
        at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
        at kafka.flink.stream.list.match.ExampleJsonParser.main(ExampleJsonParser.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        ... 12 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=recon-data
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=message
schema.0.type=VARCHAR
schema.1.name=@timestamp
schema.1.type=TIMESTAMP
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
        at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
        at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
        at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
        at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
        ... 20 more

Help me understand what am I missing