Hi team,
I am passing a security enabled kafka consumer properties to FlinkKafkaConsumer but keep getting this error java.io.NotSerializableException? what is the best way to handle this? I use Flink 1.7.1 and here is the consumer property that produces the exception props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required subject=\"test\" secret=\"test\";")); stacktrace Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) at org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) Caused by: java.io.NotSerializableException: org.apache.kafka.common.config.types.Password at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.Hashtable.writeObject(Hashtable.java:1157) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 5 more |
The exception is very clear that the SourceFunction should be serializable. Password is not serializable. You can try to set the kafka consumer properties such as this: props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" secret=\"test\";"); The String value will be parsed to Password object.(refer to the method org.apache.kafka.common.config.ConfigDef.parseType) Regards, Dian
smime.p7s (3K) Download Attachment |
Thanks, it works On Wed, 26 Dec 2018 at 10:07 fudian.fd <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |