Field could not be resolved by the field mapping when using kafka connector

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Field could not be resolved by the field mapping when using kafka connector

Jeff Zhang

Hi,

I hit the following error when I try to use kafka connector in flink table api. There's very little document about how to use kafka connector in flink table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'event_ts' could not be resolved by the field mapping.
at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
// declare the external system to connect to
.connect(
new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("status", Types.STRING)
.field("direction", Types.STRING)
.field("event_ts", Types.SQL_TIMESTAMP).rowtime(
new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
)

// specify the update-mode for streaming tables
.inAppendMode()

// register as source, sink, or both and under a name
.registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")
0封新邮件
回复

Reply | Threaded
Open this post in threaded view
|

Re: Field could not be resolved by the field mapping when using kafka connector

Hequn Cheng
Hi jeff,

We need a different field name for the rowtime indicator, something looks like:
      new Schema()
        .field("status", Types.STRING)
        .field("direction", Types.STRING)
        .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
        new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())

Furthermore, we should define another sink schema which contains no rowtime definitions, since currently time attributes and custom field mappings are not supported yet for sink. 
    val sinkSchema =
      new Schema()
        .field("status", Types.STRING)
        .field("direction", Types.STRING)
        .field("rowtime", Types.SQL_TIMESTAMP)

Btw, a unified api for source and sink is under discussion now. More details here[1]

Best, Hequn

 

On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang <[hidden email]> wrote:

Hi,

I hit the following error when I try to use kafka connector in flink table api. There's very little document about how to use kafka connector in flink table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'event_ts' could not be resolved by the field mapping.
at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
// declare the external system to connect to
.connect(
new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("status", Types.STRING)
.field("direction", Types.STRING)
.field("event_ts", Types.SQL_TIMESTAMP).rowtime(
new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
)

// specify the update-mode for streaming tables
.inAppendMode()

// register as source, sink, or both and under a name
.registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")
0封新邮件
回复

Reply | Threaded
Open this post in threaded view
|

Re: Field could not be resolved by the field mapping when using kafka connector

Jeff Zhang
Thanks hequn, it is very helpful

On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng <[hidden email]> wrote:
Hi jeff,

We need a different field name for the rowtime indicator, something looks like:
      new Schema()
        .field("status", Types.STRING)
        .field("direction", Types.STRING)
        .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
        new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())

Furthermore, we should define another sink schema which contains no rowtime definitions, since currently time attributes and custom field mappings are not supported yet for sink. 
    val sinkSchema =
      new Schema()
        .field("status", Types.STRING)
        .field("direction", Types.STRING)
        .field("rowtime", Types.SQL_TIMESTAMP)

Btw, a unified api for source and sink is under discussion now. More details here[1]

Best, Hequn

 

On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang <[hidden email]> wrote:

Hi,

I hit the following error when I try to use kafka connector in flink table api. There's very little document about how to use kafka connector in flink table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'event_ts' could not be resolved by the field mapping.
at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
// declare the external system to connect to
.connect(
new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("status", Types.STRING)
.field("direction", Types.STRING)
.field("event_ts", Types.SQL_TIMESTAMP).rowtime(
new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
)

// specify the update-mode for streaming tables
.inAppendMode()

// register as source, sink, or both and under a name
.registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")
0封新邮件
回复



--
Best Regards

Jeff Zhang