Unable to serialize org.apache.kafka.common.config.types.Password

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to serialize org.apache.kafka.common.config.types.Password

tao xiao
Hi team,

I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this?

I use Flink 1.7.1 and here is the consumer property that produces the exception

props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";"));

stacktrace
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.Hashtable.writeObject(Hashtable.java:1157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more

Reply | Threaded
Open this post in threaded view
|

Re: Unable to serialize org.apache.kafka.common.config.types.Password

fudian.fd
The exception is very clear that the SourceFunction should be serializable. Password is not serializable. You can try to set the kafka consumer properties such as this:

props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" secret=\"test\";");

The String value will be parsed to Password object.(refer to the method org.apache.kafka.common.config.ConfigDef.parseType)

Regards,
Dian


在 2018年12月25日,下午11:04,tao xiao <[hidden email]> 写道:

Hi team,

I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this?

I use Flink 1.7.1 and here is the consumer property that produces the exception

props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";"));

stacktrace
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.Hashtable.writeObject(Hashtable.java:1157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more



smime.p7s (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Unable to serialize org.apache.kafka.common.config.types.Password

tao xiao
Thanks, it works 

On Wed, 26 Dec 2018 at 10:07 fudian.fd <[hidden email]> wrote:
The exception is very clear that the SourceFunction should be serializable. Password is not serializable. You can try to set the kafka consumer properties such as this:

props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" secret=\"test\";");

The String value will be parsed to Password object.(refer to the method org.apache.kafka.common.config.ConfigDef.parseType)

Regards,
Dian


在 2018年12月25日,下午11:04,tao xiao <[hidden email]> 写道:

Hi team,

I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this?

I use Flink 1.7.1 and here is the consumer property that produces the exception

props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";"));

stacktrace
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.Hashtable.writeObject(Hashtable.java:1157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more