kerberos yarn - failure in long running streaming application

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

kerberos yarn - failure in long running streaming application

vprabhu@gmail.com
Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application stream data from kafka, groups by key, creates a session window and writes to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry : parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an argument to the open method

The applications runs for about 10 hours before it fails with kerberos error "Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu
Reply | Threaded
Open this post in threaded view
|

Re: kerberos yarn - failure in long running streaming application

Tzu-Li (Gordon) Tai
Hi,

At first glance it seems odd, since keytabs would not expire unless on principal password expiration / changes.

Was the principal’s password set for expiration, or changed? The keytab would also expire in that case.

Cheers,
Gordon

On 14 August 2017 at 2:15:40 PM, Prabhu V ([hidden email]) wrote:

Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application stream data from kafka, groups by key, creates a session window and writes to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry : parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an argument to the open method

The applications runs for about 10 hours before it fails with kerberos error "Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu
Reply | Threaded
Open this post in threaded view
|

Re: kerberos yarn - failure in long running streaming application

Ted Yu
In reply to this post by vprabhu@gmail.com
bq. security.kerberos.login.contexts: Client,KafkaClien

Just curious: there is missing 't' at the end of the above line.

Maybe a typo when composing the email ?

On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V <[hidden email]> wrote:
Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application stream data from kafka, groups by key, creates a session window and writes to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry : parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an argument to the open method

The applications runs for about 10 hours before it fails with kerberos error "Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu

Reply | Threaded
Open this post in threaded view
|

Re: kerberos yarn - failure in long running streaming application

Eron Wright
It sounds to me that the TGT is expiring (usually after 12 hours).   This shouldn't happen in the keytab scenario because of a background thread provided by Hadoop that periodically performs a re-login using the keytab.   More details on the Hadoop internals here:

To help narrow down the issue:
1. please share the stack trace (and, does the error occur on Job Manager or on Task Manager?)
2. is kinit being called on the client prior to calling `flink run`?  (just curious)
3. are you willing to share the Flink logs?

I'm happy to help if you prefer to share the the logs privately.

-Eron

On Mon, Aug 14, 2017 at 12:32 AM, Ted Yu <[hidden email]> wrote:
bq. security.kerberos.login.contexts: Client,KafkaClien

Just curious: there is missing 't' at the end of the above line.

Maybe a typo when composing the email ?

On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V <[hidden email]> wrote:
Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application stream data from kafka, groups by key, creates a session window and writes to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry : parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an argument to the open method

The applications runs for about 10 hours before it fails with kerberos error "Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu


Reply | Threaded
Open this post in threaded view
|

Re: kerberos yarn - failure in long running streaming application

vprabhu@gmail.com
Thanks for helping fix the issue Eron.

//Eron's email on this issue

I see two interesting things in the log.  One, the TGT has an expiry of 10 hours, according to the Kafka log output:

> 2017-08-13 06:14:48,248 INFO  org.apache.kafka.common.security.kerberos.Login               - TGT valid starting at: Sun Aug 13 06:14:48 UTC 2017

> 2017-08-13 06:14:48,249 INFO  org.apache.kafka.common.security.kerberos.Login               - TGT expires: Sun Aug 13 16:14:48 UTC 2017


So we can say that this problem is related to the relogin thread (or lack thereof),


The reason that renewal isn't working is probably because Hadoop 2.3 code is being used by Flink:


> 2017-08-13 06:14:40,044 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.3.0


The Hadoop dependencies are shaded inside Flink's libraries.   The CDH libraries that I see on the classpath aren't really used, AFAIK.    Maybe try using a build of Flink based on Hadoop 2.6 to match the CDH environment.

Hope this helps!

//End email


The issue was the incorrect hadoop version resulting from the way I built the project. My build configuration includes the following dependencies
    compile 'org.apache.flink:flink-streaming-java_2.10:1.3.2'
    compile 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.3.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.10:1.3.2'

I am using flink built for hadoop 2.6 (flink-1.3.2-bin-hadoop26-scala_2.10.tgz)

I did NOT build a uber jar instead included the dependency jars in the runtime classpath. The dependencies included flink-shaded-hadoop2-1.3.2.jar. This jar had classes belonging to hadoop2.3 and was causing the issue. Removing this jar from the classpath fixed the issue. The hadoop version is now 2.6.3

Thanks,
Prabhu

On Mon, Aug 14, 2017 at 9:30 AM, Eron Wright <[hidden email]> wrote:
It sounds to me that the TGT is expiring (usually after 12 hours).   This shouldn't happen in the keytab scenario because of a background thread provided by Hadoop that periodically performs a re-login using the keytab.   More details on the Hadoop internals here:

To help narrow down the issue:
1. please share the stack trace (and, does the error occur on Job Manager or on Task Manager?)
2. is kinit being called on the client prior to calling `flink run`?  (just curious)
3. are you willing to share the Flink logs?

I'm happy to help if you prefer to share the the logs privately.

-Eron

On Mon, Aug 14, 2017 at 12:32 AM, Ted Yu <[hidden email]> wrote:
bq. security.kerberos.login.contexts: Client,KafkaClien

Just curious: there is missing 't' at the end of the above line.

Maybe a typo when composing the email ?

On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V <[hidden email]> wrote:
Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application stream data from kafka, groups by key, creates a session window and writes to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry : parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an argument to the open method

The applications runs for about 10 hours before it fails with kerberos error "Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu