This post was updated on .
Hi,
I tried developing the Streaming with Table API using KafkaJsonTableSource. The snippet of the codes work well without any where/filter clause. --- snippet of the code --- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // The JSON field names and types String[] fieldNames = new String[] { "EventType", "MeterId", "Measurement", "EventDetails"}; Class<?>[] fieldTypes = new Class<?>[] { String.class, Integer.class, Integer.class, String.class }; Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); kafkaProperties.setProperty("topic", "MyTableCEP2"); KafkaJsonTableSource kafkaTableSource = new Kafka09JsonTableSource( "MyTableCEP2", kafkaProperties, fieldNames, fieldTypes); // Fail on missing JSON field kafkaTableSource.setFailOnMissingField(true); tableEnv.registerTableSource("kafkasource", kafkaTableSource); Table in2 = tableEnv.ingest("kafkasource"); Table in = in2.select("Measurement,MeterId") .where ("Measurement > 4 "); TableSink sink2 = new CsvTableSink("c://data//camel//b11.out", "|"); // write the result Table to the TableSink in.writeToSink(sink2); --------------------------------------------------------------------------- 1. The codes work well without the ".where("Measurement > 4 ")" clause 2. With the WHERE clause, i encountered the following errors : Exception in thread "main" java.lang.NoSuchFieldError: BIG_DEC_TYPE_INFO at org.apache.flink.table.typeutils.TypeCheckUtils$.isNumeric(TypeCheckUtils.scala:46) at org.apache.flink.table.expressions.EqualTo.validateInput(comparison.scala:57) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:88) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83) at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72) at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119) at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:129) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137) at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83) at org.apache.flink.table.plan.logical.Filter.validate(operators.scala:194) at org.apache.flink.table.api.Table.filter(table.scala:160) at org.apache.flink.table.api.Table.filter(table.scala:175) at org.apache.flink.table.api.Table.where(table.scala:203) Any idea what went wrong -Thiam Cheng |
Free forum by Nabble | Edit this page |