Hi, I hit the following error when I try to use kafka connector in flink table api. There's very little document about how to use kafka connector in flink table api, could anyone help me on that ? Thanks Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'event_ts' could not be resolved by the field mapping. at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491) at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521) at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127) at org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33) at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150) at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68) And here's the source code: case class Record(status: String, direction: String, var event_ts: Timestamp) 0封新邮件 回复 |
Hi jeff, We need a different field name for the rowtime indicator, something looks like: new Schema() Furthermore, we should define another sink schema which contains no rowtime definitions, since currently time attributes and custom field mappings are not supported yet for sink. val sinkSchema = Btw, a unified api for source and sink is under discussion now. More details here[1] Best, Hequn On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang <[hidden email]> wrote:
|
Thanks hequn, it is very helpful On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng <[hidden email]> wrote:
Best Regards
Jeff Zhang |
Free forum by Nabble | Edit this page |