Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following: tableEnv.connect( new Kafka() .version(kafkaInstance.getVersion()) .topic(chooseKafkaTopic(initPack.clusterMode)) .property("bootstrap.servers", kafkaInstance.getBrokerList()) .property("group.id", initPack.jobName) .startFromEarliest() ).withSchema( new Schema() // EVENT_TIME .field("rowtime", Types.SQL_TIMESTAMP).rowtime( new Rowtime() .timestampsFromField("time") .watermarksPeriodicBounded(1000) ) .field("type", Types.STRING) .field("event", Types.STRING) .field("user_id", Types.STRING) .field("distinct_id", Types.STRING) .field("project", Types.STRING) .field("recv_time", Types.SQL_TIMESTAMP) .field("properties", Types.ROW_NAMED( new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" }, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING) ) ).withFormat( new Json().failOnMissingField(false) .deriveSchema() ) .inAppendMode()
.registerTableSource(getTableName()); However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP. Not sure which data field can run to this. Wanner some help from community. Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types. at DataStreamSinkConversion$5.map(Unknown Source) at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55) at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37) at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28) at DataStreamSourceConversion$2.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.
|
hi sunfulin,
which flink version are you using ? best, godfrey sunfulin <[hidden email]> 于2020年1月10日周五 下午1:50写道:
|
Hi sunfulin, Looks like the error is happened in sink instead of source. Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types. at DataStreamSinkConversion$5.map(Unknown Source) So the point is how did you write to sink. Can you share these codes? Best, Jingsong Lee On Fri, Jan 10, 2020 at 2:58 PM godfrey he <[hidden email]> wrote:
Best, Jingsong Lee |
Hi, Thanks for the reply. Tends out that I am using table2datastream and tableEnv.sqlUpdate in the seem time and the exception thus is thrown. My mistake. At 2020-01-10 17:11:02, "Jingsong Li" <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |