Flink kafka connector with JAAS configurations crashed

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink kafka connector with JAAS configurations crashed

sundy

Hi ,all 

I use the code below to set kafka JASS config,   the serverConfig.jasspath is  /data/apps/spark/kafka_client_jaas.conf,   but on flink standalone deployment, it crashs. I am sure the kafka_client_jass.conf is valid, cause other applications(Spark streaming) are still working fine with it. So I think it may be not the problem caused by kafka 0.10 client.
System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");

Exceptions msgs are:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
	... 11 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
	at org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
	at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
	... 15 more


File content looks like below:

KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password=“xxxxxxxxxxx"; };

It seems like the kafka_client_jaas.conf file has been read, but the KafkaClient entry could not be resolved. That’s very strange, other applications with the same config are working fine. And I wrote a simple Java code to test the the file, it works fine too.


public static void main(String[] args) {
  Map<String, String> maps = new HashMap<>();
  System.setProperty("java.security.auth.login.config", "/data/apps/spark/kafka_client_jaas.conf");
  Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
  AppConfigurationEntry object[] = jassConfig.getAppConfigurationEntry("KafkaClient");
  for(AppConfigurationEntry entry : object){
    System.out.println(entry.getOptions());
  }
}




 



Reply | Threaded
Open this post in threaded view
|

Re: Flink kafka connector with JAAS configurations crashed

Nico Kruber
Hi,
I'm no expert on Kafka here, but as the tasks are run on the worker
nodes (where the TaskManagers are run), please double-check whether the
file under /data/apps/spark/kafka_client_jaas.conf on these nodes also
contains the same configuration as on the node running the JobManager,
i.e. an appropriate entry for 'KafkaClient'.


Regards
Nico

On 13/03/18 08:42, sundy wrote:

>
> Hi ,all 
>
> I use the code below to set kafka JASS config,   the
> serverConfig.jasspath is  /data/apps/spark/kafka_client_jaas.conf,   but
> on flink standalone deployment, it crashs. I am sure the
> kafka_client_jass.conf is valid, cause other applications(Spark
> streaming) are still working fine with it. So I think it may be not the
> problem caused by kafka 0.10 client.
>
> System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("sasl.mechanism", "PLAIN");
>
>
> Exceptions msgs are:
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:597)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:579)
> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
> at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
> at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
> at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
> at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
> at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
> ... 11 more
> Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
> at org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
> at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
> at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
> ... 15 more
>
>
>
> File content looks like below:
>
> KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule
> required username="admin" password=“xxxxxxxxxxx"; };
>
> It seems like the kafka_client_jaas.conf file has been read, but the
> KafkaClient entry could not be resolved. That’s very strange, other
> applications with the same config are working fine. And I wrote a simple
> Java code to test the the file, it works fine too.
>
>
> public static void main(String[] args) {
>   Map<String, String> maps = new HashMap<>();
>   System.setProperty("java.security.auth.login.config",
> "/data/apps/spark/kafka_client_jaas.conf");
>   Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
>   AppConfigurationEntry object[] =
> jassConfig.getAppConfigurationEntry("KafkaClient");
>   for(AppConfigurationEntry entry : object){
>     System.out.println(entry.getOptions());
>   }
> }
>
>
>
>
>  
>
>
>


signature.asc (201 bytes) Download Attachment