Table with KafkaJson problem with where/filter

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Table with KafkaJson problem with where/filter

leetc
This post was updated on .
Hi,

I tried developing the Streaming with Table API using KafkaJsonTableSource.

The snippet of the codes work well without any where/filter clause.

--- snippet of the code ---
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

  // The JSON field names and types
       
    String[] fieldNames =  new String[] { "EventType", "MeterId", "Measurement", "EventDetails"};
    Class<?>[] fieldTypes = new Class<?>[] { String.class, Integer.class, Integer.class, String.class };

     Properties kafkaProperties = new Properties();
     kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProperties.setProperty("topic", "MyTableCEP2");
       
       KafkaJsonTableSource kafkaTableSource = new Kafka09JsonTableSource(
        "MyTableCEP2",
                kafkaProperties,
                fieldNames,
                fieldTypes);
     // Fail on missing JSON field
     kafkaTableSource.setFailOnMissingField(true);
     tableEnv.registerTableSource("kafkasource", kafkaTableSource);
      Table in2 = tableEnv.ingest("kafkasource");
       
        Table in = in2.select("Measurement,MeterId")
          .where ("Measurement > 4 ");
       
        TableSink sink2 = new CsvTableSink("c://data//camel//b11.out",  "|");
        // write the result Table to the TableSink
        in.writeToSink(sink2);
 
---------------------------------------------------------------------------
1. The codes work well without the ".where("Measurement > 4 ")" clause
2. With the WHERE clause, i encountered the following errors :

Exception in thread "main" java.lang.NoSuchFieldError: BIG_DEC_TYPE_INFO
        at org.apache.flink.table.typeutils.TypeCheckUtils$.isNumeric(TypeCheckUtils.scala:46)
        at org.apache.flink.table.expressions.EqualTo.validateInput(comparison.scala:57)
        at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:88)
        at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
        at org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
        at org.apache.flink.table.plan.logical.LogicalNode.org$apache$flink$table$plan$logical$LogicalNode$$expressionPostOrderTransform$1(LogicalNode.scala:119)
        at org.apache.flink.table.plan.logical.LogicalNode$$anonfun$7.apply(LogicalNode.scala:129)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.flink.table.plan.logical.LogicalNode.expressionPostOrderTransform(LogicalNode.scala:137)
        at org.apache.flink.table.plan.logical.LogicalNode.validate(LogicalNode.scala:83)
        at org.apache.flink.table.plan.logical.Filter.validate(operators.scala:194)
        at org.apache.flink.table.api.Table.filter(table.scala:160)
        at org.apache.flink.table.api.Table.filter(table.scala:175)
        at org.apache.flink.table.api.Table.where(table.scala:203)

Any idea what went wrong


-Thiam Cheng