Flink job throws ClassNotFoundException on job restart

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job throws ClassNotFoundException on job restart

none none

I'm running Flink on YARN with two taskmanagers. I wrote a simple job that consumes messages from Kafka. The job runs on taskmanager 1. When I kill taskmanager 1 (via kill PID), the job gets restarted on taskmanager 2. So far so good. But right after starting the consumer the execution fails:

java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
    ... 8 more

I build the jar file with:

mvn clean package -Pbuild-jar

I also tried this but makes no difference:

mvn clean package

It's strange that my job runs fine on the first attempt, but I get CNFEs on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT because I need the BucketSink). I compared the classpaths of both taskmanagers, they are equal.

Reply | Threaded
Open this post in threaded view
|

Re: Flink job throws ClassNotFoundException on job restart

Stephan Ewen
Hi!

The master has a temporary regression due to the Work In Progress for the "changing parallelism of savepoints" feature.
We'll try and complete the change today, then it should work again.

Sorry for the inconvenience. Can you work with a revision from last week for today?

Stephan


On Wed, Oct 5, 2016 at 11:50 AM, none none <[hidden email]> wrote:

I'm running Flink on YARN with two taskmanagers. I wrote a simple job that consumes messages from Kafka. The job runs on taskmanager 1. When I kill taskmanager 1 (via kill PID), the job gets restarted on taskmanager 2. So far so good. But right after starting the consumer the execution fails:

java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
    ... 8 more

I build the jar file with:

mvn clean package -Pbuild-jar

I also tried this but makes no difference:

mvn clean package

It's strange that my job runs fine on the first attempt, but I get CNFEs on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT because I need the BucketSink). I compared the classpaths of both taskmanagers, they are equal.


Reply | Threaded
Open this post in threaded view
|

Re: Flink job throws ClassNotFoundException on job restart

none none
Hi Stephan,

thanks for the quick answer! I try to go to an older revision,

Best,
Max

2016-10-05 12:10 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The master has a temporary regression due to the Work In Progress for the "changing parallelism of savepoints" feature.
We'll try and complete the change today, then it should work again.

Sorry for the inconvenience. Can you work with a revision from last week for today?

Stephan


On Wed, Oct 5, 2016 at 11:50 AM, none none <[hidden email]> wrote:

I'm running Flink on YARN with two taskmanagers. I wrote a simple job that consumes messages from Kafka. The job runs on taskmanager 1. When I kill taskmanager 1 (via kill PID), the job gets restarted on taskmanager 2. So far so good. But right after starting the consumer the execution fails:

java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
    ... 8 more

I build the jar file with:

mvn clean package -Pbuild-jar

I also tried this but makes no difference:

mvn clean package

It's strange that my job runs fine on the first attempt, but I get CNFEs on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT because I need the BucketSink). I compared the classpaths of both taskmanagers, they are equal.



Reply | Threaded
Open this post in threaded view
|

Re: Flink job throws ClassNotFoundException on job restart

none none
I went back to commit 2afc092461cf68cf0f3c26a3ab4c58a7bd68cf71 on MASTER, seems to work.

2016-10-05 15:48 GMT+02:00 static-max <[hidden email]>:
Hi Stephan,

thanks for the quick answer! I try to go to an older revision,

Best,
Max

2016-10-05 12:10 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

The master has a temporary regression due to the Work In Progress for the "changing parallelism of savepoints" feature.
We'll try and complete the change today, then it should work again.

Sorry for the inconvenience. Can you work with a revision from last week for today?

Stephan


On Wed, Oct 5, 2016 at 11:50 AM, none none <[hidden email]> wrote:

I'm running Flink on YARN with two taskmanagers. I wrote a simple job that consumes messages from Kafka. The job runs on taskmanager 1. When I kill taskmanager 1 (via kill PID), the job gets restarted on taskmanager 2. So far so good. But right after starting the consumer the execution fails:

java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
    ... 8 more

I build the jar file with:

mvn clean package -Pbuild-jar

I also tried this but makes no difference:

mvn clean package

It's strange that my job runs fine on the first attempt, but I get CNFEs on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT because I need the BucketSink). I compared the classpaths of both taskmanagers, they are equal.