Null result cannot be used for atomic types

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Null result cannot be used for atomic types

sunfulin

Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following: 


tableEnv.connect(

    new Kafka()

                .version(kafkaInstance.getVersion())

                .topic(chooseKafkaTopic(initPack.clusterMode))

                .property("bootstrap.servers", kafkaInstance.getBrokerList())

                .property("group.id", initPack.jobName)

                .startFromEarliest()  

).withSchema(

    new Schema()

            // EVENT_TIME 

            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(

                new Rowtime()

                    .timestampsFromField("time")

                    .watermarksPeriodicBounded(1000)

            )

            .field("type", Types.STRING)

            .field("event", Types.STRING)

            .field("user_id", Types.STRING)

            .field("distinct_id", Types.STRING)

            .field("project", Types.STRING)

            .field("recv_time", Types.SQL_TIMESTAMP)

            .field("properties", Types.ROW_NAMED(

                    new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },

                    Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)

            )

).withFormat(

    new Json().failOnMissingField(false)

            .deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());



However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP. 

Not sure which data field can run to this. Wanner some help from community.


Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.




 

Reply | Threaded
Open this post in threaded view
|

Re: Null result cannot be used for atomic types

godfrey he
hi sunfulin,

which flink version are you using ?

best,
godfrey

sunfulin <[hidden email]> 于2020年1月10日周五 下午1:50写道:

Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following: 


tableEnv.connect(

    new Kafka()

                .version(kafkaInstance.getVersion())

                .topic(chooseKafkaTopic(initPack.clusterMode))

                .property("bootstrap.servers", kafkaInstance.getBrokerList())

                .property("group.id", initPack.jobName)

                .startFromEarliest()  

).withSchema(

    new Schema()

            // EVENT_TIME 

            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(

                new Rowtime()

                    .timestampsFromField("time")

                    .watermarksPeriodicBounded(1000)

            )

            .field("type", Types.STRING)

            .field("event", Types.STRING)

            .field("user_id", Types.STRING)

            .field("distinct_id", Types.STRING)

            .field("project", Types.STRING)

            .field("recv_time", Types.SQL_TIMESTAMP)

            .field("properties", Types.ROW_NAMED(

                    new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },

                    Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)

            )

).withFormat(

    new Json().failOnMissingField(false)

            .deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());



However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP. 

Not sure which data field can run to this. Wanner some help from community.


Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.




 

Reply | Threaded
Open this post in threaded view
|

Re: Null result cannot be used for atomic types

Jingsong Li
Hi sunfulin,

Looks like the error is happened in sink instead of source.

Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)


So the point is how did you write to sink. Can you share these codes?


Best,

Jingsong Lee


On Fri, Jan 10, 2020 at 2:58 PM godfrey he <[hidden email]> wrote:
hi sunfulin,

which flink version are you using ?

best,
godfrey

sunfulin <[hidden email]> 于2020年1月10日周五 下午1:50写道:

Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following: 


tableEnv.connect(

    new Kafka()

                .version(kafkaInstance.getVersion())

                .topic(chooseKafkaTopic(initPack.clusterMode))

                .property("bootstrap.servers", kafkaInstance.getBrokerList())

                .property("group.id", initPack.jobName)

                .startFromEarliest()  

).withSchema(

    new Schema()

            // EVENT_TIME 

            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(

                new Rowtime()

                    .timestampsFromField("time")

                    .watermarksPeriodicBounded(1000)

            )

            .field("type", Types.STRING)

            .field("event", Types.STRING)

            .field("user_id", Types.STRING)

            .field("distinct_id", Types.STRING)

            .field("project", Types.STRING)

            .field("recv_time", Types.SQL_TIMESTAMP)

            .field("properties", Types.ROW_NAMED(

                    new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },

                    Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)

            )

).withFormat(

    new Json().failOnMissingField(false)

            .deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());



However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP. 

Not sure which data field can run to this. Wanner some help from community.


Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.




 



--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re:Re: Null result cannot be used for atomic types

sunfulin
Hi,
Thanks for the reply. Tends out that I am using table2datastream and tableEnv.sqlUpdate in the seem time and the exception thus is thrown. My mistake.





At 2020-01-10 17:11:02, "Jingsong Li" <[hidden email]> wrote:

Hi sunfulin,

Looks like the error is happened in sink instead of source.

Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)


So the point is how did you write to sink. Can you share these codes?


Best,

Jingsong Lee


On Fri, Jan 10, 2020 at 2:58 PM godfrey he <[hidden email]> wrote:
hi sunfulin,

which flink version are you using ?

best,
godfrey

sunfulin <[hidden email]> 于2020年1月10日周五 下午1:50写道:

Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following: 


tableEnv.connect(

    new Kafka()

                .version(kafkaInstance.getVersion())

                .topic(chooseKafkaTopic(initPack.clusterMode))

                .property("bootstrap.servers", kafkaInstance.getBrokerList())

                .property("group.id", initPack.jobName)

                .startFromEarliest()  

).withSchema(

    new Schema()

            // EVENT_TIME 

            .field("rowtime", Types.SQL_TIMESTAMP).rowtime(

                new Rowtime()

                    .timestampsFromField("time")

                    .watermarksPeriodicBounded(1000)

            )

            .field("type", Types.STRING)

            .field("event", Types.STRING)

            .field("user_id", Types.STRING)

            .field("distinct_id", Types.STRING)

            .field("project", Types.STRING)

            .field("recv_time", Types.SQL_TIMESTAMP)

            .field("properties", Types.ROW_NAMED(

                    new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },

                    Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)

            )

).withFormat(

    new Json().failOnMissingField(false)

            .deriveSchema()

)

.inAppendMode()

.registerTableSource(getTableName());



However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP. 

Not sure which data field can run to this. Wanner some help from community.


Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.

 at DataStreamSinkConversion$5.map(Unknown Source)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)

 at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)

 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)

 at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)

 at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)

 at DataStreamSourceConversion$2.processElement(Unknown Source)

 at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)

 at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)

 at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)

 at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)

 at org.apache.flink.streaming.




 



--
Best, Jingsong Lee